Skip to main content

rocketmq_admin_core/core/
cache.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Caching layer for performance optimization
16//!
17//! Provides in-memory caching for frequently accessed data like cluster info
18//! and topic routes to reduce network calls and improve response times.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Duration;
23use std::time::Instant;
24
25use rocketmq_remoting::protocol::body::broker_body::cluster_info::ClusterInfo;
26use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
27use tokio::sync::RwLock;
28
29/// Cache entry with expiration
30#[derive(Clone)]
31struct CacheEntry<T> {
32    value: T,
33    expires_at: Instant,
34}
35
36impl<T> CacheEntry<T> {
37    fn new(value: T, ttl: Duration) -> Self {
38        Self {
39            value,
40            expires_at: Instant::now() + ttl,
41        }
42    }
43
44    fn is_expired(&self) -> bool {
45        Instant::now() >= self.expires_at
46    }
47}
48
49/// Thread-safe cache for RocketMQ data
50pub struct RocketMQCache {
51    cluster_info: Arc<RwLock<Option<CacheEntry<ClusterInfo>>>>,
52    topic_routes: Arc<RwLock<HashMap<String, CacheEntry<TopicRouteData>>>>,
53    cluster_ttl: Duration,
54    route_ttl: Duration,
55}
56
57impl Default for RocketMQCache {
58    fn default() -> Self {
59        Self::new(Duration::from_secs(300), Duration::from_secs(60))
60    }
61}
62
63impl RocketMQCache {
64    /// Create new cache with custom TTL
65    pub fn new(cluster_ttl: Duration, route_ttl: Duration) -> Self {
66        Self {
67            cluster_info: Arc::new(RwLock::new(None)),
68            topic_routes: Arc::new(RwLock::new(HashMap::new())),
69            cluster_ttl,
70            route_ttl,
71        }
72    }
73
74    /// Get cached cluster info
75    pub async fn get_cluster_info(&self) -> Option<ClusterInfo> {
76        let cache: tokio::sync::RwLockReadGuard<'_, Option<CacheEntry<ClusterInfo>>> = self.cluster_info.read().await;
77        cache.as_ref().and_then(|entry: &CacheEntry<ClusterInfo>| {
78            if entry.is_expired() {
79                None
80            } else {
81                Some(entry.value.clone())
82            }
83        })
84    }
85
86    /// Cache cluster info
87    pub async fn set_cluster_info(&self, info: ClusterInfo) {
88        let mut cache = self.cluster_info.write().await;
89        *cache = Some(CacheEntry::new(info, self.cluster_ttl));
90    }
91
92    /// Get cached topic route
93    pub async fn get_topic_route(&self, topic: &str) -> Option<TopicRouteData> {
94        let cache = self.topic_routes.read().await;
95        cache.get(topic).and_then(|entry| {
96            if entry.is_expired() {
97                None
98            } else {
99                Some(entry.value.clone())
100            }
101        })
102    }
103
104    /// Cache topic route
105    pub async fn set_topic_route(&self, topic: String, route: TopicRouteData) {
106        let mut cache = self.topic_routes.write().await;
107        cache.insert(topic, CacheEntry::new(route, self.route_ttl));
108    }
109
110    /// Clear all cached data
111    pub async fn clear(&self) {
112        let mut cluster_cache = self.cluster_info.write().await;
113        *cluster_cache = None;
114
115        let mut route_cache = self.topic_routes.write().await;
116        route_cache.clear();
117    }
118
119    /// Remove expired entries
120    pub async fn cleanup_expired(&self) {
121        // Check cluster info
122        {
123            let cache: tokio::sync::RwLockReadGuard<'_, Option<CacheEntry<ClusterInfo>>> =
124                self.cluster_info.read().await;
125            if let Some(entry) = cache.as_ref() {
126                if entry.is_expired() {
127                    drop(cache);
128                    let mut write_cache: tokio::sync::RwLockWriteGuard<'_, Option<CacheEntry<ClusterInfo>>> =
129                        self.cluster_info.write().await;
130                    *write_cache = None;
131                }
132            }
133        }
134
135        // Check topic routes
136        {
137            let cache = self.topic_routes.read().await;
138            let expired_keys: Vec<_> = cache
139                .iter()
140                .filter(|(_, entry)| entry.is_expired())
141                .map(|(key, _)| key.clone())
142                .collect();
143
144            if !expired_keys.is_empty() {
145                drop(cache);
146                let mut write_cache = self.topic_routes.write().await;
147                for key in expired_keys {
148                    write_cache.remove(&key);
149                }
150            }
151        }
152    }
153
154    /// Get cache statistics
155    pub async fn stats(&self) -> CacheStats {
156        let cluster_cached: bool = {
157            let guard: tokio::sync::RwLockReadGuard<'_, Option<CacheEntry<ClusterInfo>>> =
158                self.cluster_info.read().await;
159            guard.is_some()
160        };
161        let routes_count = self.topic_routes.read().await.len();
162
163        CacheStats {
164            cluster_info_cached: cluster_cached,
165            topic_routes_count: routes_count,
166        }
167    }
168}
169
170/// Cache statistics
171#[derive(Debug, Clone)]
172pub struct CacheStats {
173    pub cluster_info_cached: bool,
174    pub topic_routes_count: usize,
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180
181    #[tokio::test]
182    async fn test_cache_expiration() {
183        let cache = RocketMQCache::new(Duration::from_millis(100), Duration::from_millis(100));
184
185        let cluster_info = ClusterInfo::default();
186        cache.set_cluster_info(cluster_info.clone()).await;
187
188        // Should be cached immediately
189        assert!(cache.get_cluster_info().await.is_some());
190
191        // Wait for expiration
192        tokio::time::sleep(Duration::from_millis(150)).await;
193
194        // Should be expired
195        assert!(cache.get_cluster_info().await.is_none());
196    }
197
198    #[tokio::test]
199    async fn test_cache_clear() {
200        let cache = RocketMQCache::default();
201
202        let cluster_info = ClusterInfo::default();
203        cache.set_cluster_info(cluster_info).await;
204
205        assert!(cache.get_cluster_info().await.is_some());
206
207        cache.clear().await;
208
209        assert!(cache.get_cluster_info().await.is_none());
210    }
211}