rocketmq_controller/storage/
file_backend.rs1use std::collections::HashMap;
19use std::path::PathBuf;
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use parking_lot::RwLock;
24use tokio::fs;
25use tokio::io::AsyncReadExt;
26use tokio::io::AsyncWriteExt;
27use tracing::debug;
28use tracing::info;
29use tracing::warn;
30
31use crate::error::ControllerError;
32use crate::error::Result;
33use crate::storage::StorageBackend;
34use crate::storage::StorageStats;
35
36pub struct FileBackend {
51 path: PathBuf,
53
54 index: Arc<RwLock<HashMap<String, PathBuf>>>,
56}
57
58impl FileBackend {
59 pub async fn new(path: PathBuf) -> Result<Self> {
61 info!("Opening file-based storage at {:?}", path);
62
63 fs::create_dir_all(&path).await.map_err(|e| {
65 ControllerError::StorageError(format!("Failed to create directory: {}", e))
66 })?;
67
68 let data_dir = path.join("data");
69 fs::create_dir_all(&data_dir).await.map_err(|e| {
70 ControllerError::StorageError(format!("Failed to create data directory: {}", e))
71 })?;
72
73 let backend = Self {
74 path,
75 index: Arc::new(RwLock::new(HashMap::new())),
76 };
77
78 backend.load_index().await?;
80
81 info!("File-based storage opened successfully");
82
83 Ok(backend)
84 }
85
86 pub fn path(&self) -> &PathBuf {
88 &self.path
89 }
90
91 async fn load_index(&self) -> Result<()> {
93 let metadata_path = self.path.join("metadata.json");
94
95 if !metadata_path.exists() {
96 info!("No existing index found, starting fresh");
97 return Ok(());
98 }
99
100 let content = fs::read(&metadata_path).await.map_err(|e| {
101 ControllerError::StorageError(format!("Failed to read metadata: {}", e))
102 })?;
103
104 let loaded_index: HashMap<String, PathBuf> =
105 serde_json::from_slice(&content).map_err(|e| {
106 ControllerError::SerializationError(format!("Failed to parse metadata: {}", e))
107 })?;
108
109 *self.index.write() = loaded_index;
110
111 info!("Loaded index with {} keys", self.index.read().len());
112
113 Ok(())
114 }
115
116 async fn save_index(&self) -> Result<()> {
118 let metadata_path = self.path.join("metadata.json");
119
120 let index = self.index.read().clone();
121 let content = serde_json::to_vec_pretty(&index).map_err(|e| {
122 ControllerError::SerializationError(format!("Failed to serialize metadata: {}", e))
123 })?;
124
125 fs::write(&metadata_path, content).await.map_err(|e| {
126 ControllerError::StorageError(format!("Failed to write metadata: {}", e))
127 })?;
128
129 Ok(())
130 }
131
132 fn get_file_path(&self, key: &str) -> PathBuf {
134 let hash = Self::hash_key(key);
136 self.path.join("data").join(format!("{}.dat", hash))
137 }
138
139 fn hash_key(key: &str) -> String {
141 if key
143 .chars()
144 .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
145 {
146 key.to_string()
147 } else {
148 format!("{:x}", Self::simple_hash(key))
150 }
151 }
152
153 fn simple_hash(s: &str) -> u64 {
155 let mut hash = 0u64;
156 for byte in s.bytes() {
157 hash = hash.wrapping_mul(31).wrapping_add(byte as u64);
158 }
159 hash
160 }
161}
162
163#[async_trait]
164impl StorageBackend for FileBackend {
165 async fn put(&self, key: &str, value: &[u8]) -> Result<()> {
166 debug!("File put: key={}, size={}", key, value.len());
167
168 let file_path = self.get_file_path(key);
169
170 let mut file = fs::File::create(&file_path)
172 .await
173 .map_err(|e| ControllerError::StorageError(format!("Failed to create file: {}", e)))?;
174
175 file.write_all(value)
176 .await
177 .map_err(|e| ControllerError::StorageError(format!("Failed to write file: {}", e)))?;
178
179 file.sync_all()
180 .await
181 .map_err(|e| ControllerError::StorageError(format!("Failed to sync file: {}", e)))?;
182
183 self.index.write().insert(key.to_string(), file_path);
185
186 if self.index.read().len() % 10 == 0 {
188 self.save_index().await?;
189 }
190
191 Ok(())
192 }
193
194 async fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
195 debug!("File get: key={}", key);
196
197 let file_path = match self.index.read().get(key) {
198 Some(path) => path.clone(),
199 None => return Ok(None),
200 };
201
202 if !file_path.exists() {
204 warn!("File not found for key: {}", key);
205 self.index.write().remove(key);
206 return Ok(None);
207 }
208
209 let mut file = fs::File::open(&file_path)
211 .await
212 .map_err(|e| ControllerError::StorageError(format!("Failed to open file: {}", e)))?;
213
214 let mut buffer = Vec::new();
215 file.read_to_end(&mut buffer)
216 .await
217 .map_err(|e| ControllerError::StorageError(format!("Failed to read file: {}", e)))?;
218
219 Ok(Some(buffer))
220 }
221
222 async fn delete(&self, key: &str) -> Result<()> {
223 debug!("File delete: key={}", key);
224
225 let file_path = match self.index.write().remove(key) {
226 Some(path) => path,
227 None => return Ok(()), };
229
230 if file_path.exists() {
232 fs::remove_file(&file_path).await.map_err(|e| {
233 ControllerError::StorageError(format!("Failed to delete file: {}", e))
234 })?;
235 }
236
237 self.save_index().await?;
239
240 Ok(())
241 }
242
243 async fn list_keys(&self, prefix: &str) -> Result<Vec<String>> {
244 debug!("File list_keys: prefix={}", prefix);
245
246 let keys: Vec<String> = self
247 .index
248 .read()
249 .keys()
250 .filter(|k| k.starts_with(prefix))
251 .cloned()
252 .collect();
253
254 Ok(keys)
255 }
256
257 async fn batch_put(&self, items: Vec<(String, Vec<u8>)>) -> Result<()> {
258 debug!("File batch_put: {} items", items.len());
259
260 for (key, value) in items {
261 self.put(&key, &value).await?;
262 }
263
264 self.save_index().await?;
266
267 Ok(())
268 }
269
270 async fn batch_delete(&self, keys: Vec<String>) -> Result<()> {
271 debug!("File batch_delete: {} keys", keys.len());
272
273 for key in keys {
274 self.delete(&key).await?;
275 }
276
277 Ok(())
278 }
279
280 async fn exists(&self, key: &str) -> Result<bool> {
281 debug!("File exists: key={}", key);
282 Ok(self.index.read().contains_key(key))
283 }
284
285 async fn clear(&self) -> Result<()> {
286 info!("File clear: removing all data");
287
288 let data_dir = self.path.join("data");
290 if data_dir.exists() {
291 fs::remove_dir_all(&data_dir).await.map_err(|e| {
292 ControllerError::StorageError(format!("Failed to clear data: {}", e))
293 })?;
294
295 fs::create_dir_all(&data_dir).await.map_err(|e| {
296 ControllerError::StorageError(format!("Failed to recreate data directory: {}", e))
297 })?;
298 }
299
300 self.index.write().clear();
302 self.save_index().await?;
303
304 Ok(())
305 }
306
307 async fn sync(&self) -> Result<()> {
308 debug!("File sync");
309 self.save_index().await
310 }
311
312 async fn stats(&self) -> Result<StorageStats> {
313 debug!("File stats");
314
315 let paths: Vec<PathBuf> = {
317 let index = self.index.read();
318 index.values().cloned().collect()
319 };
320
321 let key_count = paths.len();
322
323 let mut total_size = 0u64;
325 for path in paths {
326 if path.exists() {
327 if let Ok(metadata) = fs::metadata(&path).await {
328 total_size += metadata.len();
329 }
330 }
331 }
332
333 Ok(StorageStats {
334 key_count,
335 total_size,
336 backend_info: format!("File-based storage at {:?}", self.path),
337 })
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use tempfile::TempDir;
344
345 use super::*;
346
347 #[tokio::test]
348 async fn test_file_backend() {
349 let temp_dir = TempDir::new().unwrap();
350 let storage_path = temp_dir.path().join("file_storage");
351
352 let backend = FileBackend::new(storage_path).await.unwrap();
353
354 backend.put("test_key", b"test_value").await.unwrap();
356 let value = backend.get("test_key").await.unwrap();
357 assert_eq!(value, Some(b"test_value".to_vec()));
358
359 assert!(backend.exists("test_key").await.unwrap());
361 assert!(!backend.exists("nonexistent").await.unwrap());
362
363 backend.delete("test_key").await.unwrap();
365 assert!(!backend.exists("test_key").await.unwrap());
366
367 let items = vec![
369 ("batch_1".to_string(), b"value1".to_vec()),
370 ("batch_2".to_string(), b"value2".to_vec()),
371 ];
372 backend.batch_put(items).await.unwrap();
373
374 assert!(backend.exists("batch_1").await.unwrap());
375 assert!(backend.exists("batch_2").await.unwrap());
376
377 backend.put("prefix_1", b"value1").await.unwrap();
379 backend.put("prefix_2", b"value2").await.unwrap();
380
381 let keys = backend.list_keys("prefix_").await.unwrap();
382 assert_eq!(keys.len(), 2);
383
384 let stats = backend.stats().await.unwrap();
386 assert!(stats.key_count >= 4);
387
388 backend.sync().await.unwrap();
390 }
391
392 #[tokio::test]
393 async fn test_file_backend_persistence() {
394 let temp_dir = TempDir::new().unwrap();
395 let storage_path = temp_dir.path().join("persistent_storage");
396
397 {
399 let backend = FileBackend::new(storage_path.clone()).await.unwrap();
400 backend.put("persist_key", b"persist_value").await.unwrap();
401 backend.sync().await.unwrap();
402 }
403
404 {
406 let backend = FileBackend::new(storage_path).await.unwrap();
407 let value = backend.get("persist_key").await.unwrap();
408 assert_eq!(value, Some(b"persist_value".to_vec()));
409 }
410 }
411}