rocketmq_controller/storage/
mod.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18//! Storage backend abstraction
19//!
20//! This module provides a trait-based abstraction for different storage backends,
21//! allowing the controller to use either RocksDB or file-based storage.
22
23#[cfg(feature = "storage-rocksdb")]
24pub mod rocksdb_backend;
25
26#[cfg(feature = "storage-file")]
27pub mod file_backend;
28
29use std::path::PathBuf;
30
31use async_trait::async_trait;
32use serde::de::DeserializeOwned;
33use serde::Serialize;
34
35use crate::error::Result;
36
37/// Storage backend configuration
38#[derive(Debug, Clone)]
39pub enum StorageConfig {
40    /// RocksDB storage
41    #[cfg(feature = "storage-rocksdb")]
42    RocksDB {
43        /// Database path
44        path: PathBuf,
45    },
46
47    /// File-based storage
48    #[cfg(feature = "storage-file")]
49    File {
50        /// Data directory path
51        path: PathBuf,
52    },
53
54    /// In-memory storage (for testing)
55    Memory,
56}
57
58/// Storage backend trait
59///
60/// This trait abstracts over different storage implementations,
61/// providing a unified interface for storing and retrieving data.
62#[async_trait]
63pub trait StorageBackend: Send + Sync {
64    /// Put a key-value pair
65    async fn put(&self, key: &str, value: &[u8]) -> Result<()>;
66
67    /// Get a value by key
68    async fn get(&self, key: &str) -> Result<Option<Vec<u8>>>;
69
70    /// Delete a key
71    async fn delete(&self, key: &str) -> Result<()>;
72
73    /// List all keys with a given prefix
74    async fn list_keys(&self, prefix: &str) -> Result<Vec<String>>;
75
76    /// Batch put multiple key-value pairs
77    async fn batch_put(&self, items: Vec<(String, Vec<u8>)>) -> Result<()>;
78
79    /// Batch delete multiple keys
80    async fn batch_delete(&self, keys: Vec<String>) -> Result<()>;
81
82    /// Check if a key exists
83    async fn exists(&self, key: &str) -> Result<bool>;
84
85    /// Clear all data (use with caution!)
86    async fn clear(&self) -> Result<()>;
87
88    /// Sync data to disk
89    async fn sync(&self) -> Result<()>;
90
91    /// Get storage statistics
92    async fn stats(&self) -> Result<StorageStats>;
93}
94
95/// Storage statistics
96#[derive(Debug, Clone, Default)]
97pub struct StorageStats {
98    /// Number of keys
99    pub key_count: usize,
100
101    /// Total size in bytes
102    pub total_size: u64,
103
104    /// Backend-specific info
105    pub backend_info: String,
106}
107
108/// Helper methods for storing/retrieving typed data
109#[async_trait]
110pub trait StorageBackendExt: StorageBackend {
111    /// Put a serializable value
112    async fn put_json<T: Serialize + Send + Sync>(&self, key: &str, value: &T) -> Result<()> {
113        let data = serde_json::to_vec(value)
114            .map_err(|e| crate::error::ControllerError::SerializationError(e.to_string()))?;
115        self.put(key, &data).await
116    }
117
118    /// Get and deserialize a value
119    async fn get_json<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
120        match self.get(key).await? {
121            Some(data) => {
122                let value = serde_json::from_slice(&data).map_err(|e| {
123                    crate::error::ControllerError::SerializationError(e.to_string())
124                })?;
125                Ok(Some(value))
126            }
127            None => Ok(None),
128        }
129    }
130
131    /// List all values with a given prefix
132    async fn list_json<T: DeserializeOwned + Send>(&self, prefix: &str) -> Result<Vec<T>> {
133        let keys = self.list_keys(prefix).await?;
134        let mut values = Vec::new();
135
136        for key in keys {
137            if let Some(data) = self.get(&key).await? {
138                let value: T = serde_json::from_slice(&data).map_err(|e| {
139                    crate::error::ControllerError::SerializationError(e.to_string())
140                })?;
141                values.push(value);
142            }
143        }
144
145        Ok(values)
146    }
147}
148
149// Blanket implementation for all StorageBackend implementors
150impl<T: StorageBackend + ?Sized> StorageBackendExt for T {}
151
152/// Create a storage backend based on configuration
153pub async fn create_storage(config: StorageConfig) -> Result<Box<dyn StorageBackend>> {
154    match config {
155        #[cfg(feature = "storage-rocksdb")]
156        StorageConfig::RocksDB { path } => {
157            let backend = rocksdb_backend::RocksDBBackend::new(path).await?;
158            Ok(Box::new(backend))
159        }
160
161        #[cfg(feature = "storage-file")]
162        StorageConfig::File { path } => {
163            let backend = file_backend::FileBackend::new(path).await?;
164            Ok(Box::new(backend))
165        }
166
167        StorageConfig::Memory => {
168            // For testing, use a simple in-memory implementation
169            use std::collections::HashMap;
170            use std::sync::Arc;
171
172            use parking_lot::RwLock;
173
174            #[derive(Clone)]
175            struct MemoryBackend {
176                data: Arc<RwLock<HashMap<String, Vec<u8>>>>,
177            }
178
179            #[async_trait]
180            impl StorageBackend for MemoryBackend {
181                async fn put(&self, key: &str, value: &[u8]) -> Result<()> {
182                    self.data.write().insert(key.to_string(), value.to_vec());
183                    Ok(())
184                }
185
186                async fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
187                    Ok(self.data.read().get(key).cloned())
188                }
189
190                async fn delete(&self, key: &str) -> Result<()> {
191                    self.data.write().remove(key);
192                    Ok(())
193                }
194
195                async fn list_keys(&self, prefix: &str) -> Result<Vec<String>> {
196                    Ok(self
197                        .data
198                        .read()
199                        .keys()
200                        .filter(|k| k.starts_with(prefix))
201                        .cloned()
202                        .collect())
203                }
204
205                async fn batch_put(&self, items: Vec<(String, Vec<u8>)>) -> Result<()> {
206                    let mut data = self.data.write();
207                    for (key, value) in items {
208                        data.insert(key, value);
209                    }
210                    Ok(())
211                }
212
213                async fn batch_delete(&self, keys: Vec<String>) -> Result<()> {
214                    let mut data = self.data.write();
215                    for key in keys {
216                        data.remove(&key);
217                    }
218                    Ok(())
219                }
220
221                async fn exists(&self, key: &str) -> Result<bool> {
222                    Ok(self.data.read().contains_key(key))
223                }
224
225                async fn clear(&self) -> Result<()> {
226                    self.data.write().clear();
227                    Ok(())
228                }
229
230                async fn sync(&self) -> Result<()> {
231                    Ok(())
232                }
233
234                async fn stats(&self) -> Result<StorageStats> {
235                    let data = self.data.read();
236                    let total_size: u64 = data.values().map(|v| v.len() as u64).sum();
237                    Ok(StorageStats {
238                        key_count: data.len(),
239                        total_size,
240                        backend_info: "Memory".to_string(),
241                    })
242                }
243            }
244
245            Ok(Box::new(MemoryBackend {
246                data: Arc::new(RwLock::new(HashMap::new())),
247            }))
248        }
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255
256    #[tokio::test]
257    async fn test_memory_backend() {
258        let backend = create_storage(StorageConfig::Memory).await.unwrap();
259
260        // Test put and get
261        backend.put("test_key", b"test_value").await.unwrap();
262        let value = backend.get("test_key").await.unwrap();
263        assert_eq!(value, Some(b"test_value".to_vec()));
264
265        // Test exists
266        assert!(backend.exists("test_key").await.unwrap());
267        assert!(!backend.exists("nonexistent").await.unwrap());
268
269        // Test delete
270        backend.delete("test_key").await.unwrap();
271        assert!(!backend.exists("test_key").await.unwrap());
272
273        // Test list_keys
274        backend.put("prefix_1", b"value1").await.unwrap();
275        backend.put("prefix_2", b"value2").await.unwrap();
276        backend.put("other_1", b"value3").await.unwrap();
277
278        let keys = backend.list_keys("prefix_").await.unwrap();
279        assert_eq!(keys.len(), 2);
280
281        // Test stats
282        let stats = backend.stats().await.unwrap();
283        assert_eq!(stats.key_count, 3);
284    }
285
286    #[tokio::test]
287    async fn test_json_operations() {
288        use serde::Deserialize;
289        use serde::Serialize;
290
291        #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
292        struct TestData {
293            id: u64,
294            name: String,
295        }
296
297        let backend = create_storage(StorageConfig::Memory).await.unwrap();
298
299        let data = TestData {
300            id: 123,
301            name: "test".to_string(),
302        };
303
304        // Test put_json and get_json
305        backend.put_json("test_json", &data).await.unwrap();
306        let retrieved: Option<TestData> = backend.get_json("test_json").await.unwrap();
307        assert_eq!(retrieved, Some(data));
308    }
309}