rocketmq_admin_core/core/
cache.rs1use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Duration;
23use std::time::Instant;
24
25use rocketmq_remoting::protocol::body::broker_body::cluster_info::ClusterInfo;
26use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
27use tokio::sync::RwLock;
28
29#[derive(Clone)]
31struct CacheEntry<T> {
32 value: T,
33 expires_at: Instant,
34}
35
36impl<T> CacheEntry<T> {
37 fn new(value: T, ttl: Duration) -> Self {
38 Self {
39 value,
40 expires_at: Instant::now() + ttl,
41 }
42 }
43
44 fn is_expired(&self) -> bool {
45 Instant::now() >= self.expires_at
46 }
47}
48
49pub struct RocketMQCache {
51 cluster_info: Arc<RwLock<Option<CacheEntry<ClusterInfo>>>>,
52 topic_routes: Arc<RwLock<HashMap<String, CacheEntry<TopicRouteData>>>>,
53 cluster_ttl: Duration,
54 route_ttl: Duration,
55}
56
57impl Default for RocketMQCache {
58 fn default() -> Self {
59 Self::new(Duration::from_secs(300), Duration::from_secs(60))
60 }
61}
62
63impl RocketMQCache {
64 pub fn new(cluster_ttl: Duration, route_ttl: Duration) -> Self {
66 Self {
67 cluster_info: Arc::new(RwLock::new(None)),
68 topic_routes: Arc::new(RwLock::new(HashMap::new())),
69 cluster_ttl,
70 route_ttl,
71 }
72 }
73
74 pub async fn get_cluster_info(&self) -> Option<ClusterInfo> {
76 let cache: tokio::sync::RwLockReadGuard<'_, Option<CacheEntry<ClusterInfo>>> = self.cluster_info.read().await;
77 cache.as_ref().and_then(|entry: &CacheEntry<ClusterInfo>| {
78 if entry.is_expired() {
79 None
80 } else {
81 Some(entry.value.clone())
82 }
83 })
84 }
85
86 pub async fn set_cluster_info(&self, info: ClusterInfo) {
88 let mut cache = self.cluster_info.write().await;
89 *cache = Some(CacheEntry::new(info, self.cluster_ttl));
90 }
91
92 pub async fn get_topic_route(&self, topic: &str) -> Option<TopicRouteData> {
94 let cache = self.topic_routes.read().await;
95 cache.get(topic).and_then(|entry| {
96 if entry.is_expired() {
97 None
98 } else {
99 Some(entry.value.clone())
100 }
101 })
102 }
103
104 pub async fn set_topic_route(&self, topic: String, route: TopicRouteData) {
106 let mut cache = self.topic_routes.write().await;
107 cache.insert(topic, CacheEntry::new(route, self.route_ttl));
108 }
109
110 pub async fn clear(&self) {
112 let mut cluster_cache = self.cluster_info.write().await;
113 *cluster_cache = None;
114
115 let mut route_cache = self.topic_routes.write().await;
116 route_cache.clear();
117 }
118
119 pub async fn cleanup_expired(&self) {
121 {
123 let cache: tokio::sync::RwLockReadGuard<'_, Option<CacheEntry<ClusterInfo>>> =
124 self.cluster_info.read().await;
125 if let Some(entry) = cache.as_ref() {
126 if entry.is_expired() {
127 drop(cache);
128 let mut write_cache: tokio::sync::RwLockWriteGuard<'_, Option<CacheEntry<ClusterInfo>>> =
129 self.cluster_info.write().await;
130 *write_cache = None;
131 }
132 }
133 }
134
135 {
137 let cache = self.topic_routes.read().await;
138 let expired_keys: Vec<_> = cache
139 .iter()
140 .filter(|(_, entry)| entry.is_expired())
141 .map(|(key, _)| key.clone())
142 .collect();
143
144 if !expired_keys.is_empty() {
145 drop(cache);
146 let mut write_cache = self.topic_routes.write().await;
147 for key in expired_keys {
148 write_cache.remove(&key);
149 }
150 }
151 }
152 }
153
154 pub async fn stats(&self) -> CacheStats {
156 let cluster_cached: bool = {
157 let guard: tokio::sync::RwLockReadGuard<'_, Option<CacheEntry<ClusterInfo>>> =
158 self.cluster_info.read().await;
159 guard.is_some()
160 };
161 let routes_count = self.topic_routes.read().await.len();
162
163 CacheStats {
164 cluster_info_cached: cluster_cached,
165 topic_routes_count: routes_count,
166 }
167 }
168}
169
170#[derive(Debug, Clone)]
172pub struct CacheStats {
173 pub cluster_info_cached: bool,
174 pub topic_routes_count: usize,
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180
181 #[tokio::test]
182 async fn test_cache_expiration() {
183 let cache = RocketMQCache::new(Duration::from_millis(100), Duration::from_millis(100));
184
185 let cluster_info = ClusterInfo::default();
186 cache.set_cluster_info(cluster_info.clone()).await;
187
188 assert!(cache.get_cluster_info().await.is_some());
190
191 tokio::time::sleep(Duration::from_millis(150)).await;
193
194 assert!(cache.get_cluster_info().await.is_none());
196 }
197
198 #[tokio::test]
199 async fn test_cache_clear() {
200 let cache = RocketMQCache::default();
201
202 let cluster_info = ClusterInfo::default();
203 cache.set_cluster_info(cluster_info).await;
204
205 assert!(cache.get_cluster_info().await.is_some());
206
207 cache.clear().await;
208
209 assert!(cache.get_cluster_info().await.is_none());
210 }
211}