rocketmq_controller/storage/
file_backend.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::collections::HashMap;
19use std::path::PathBuf;
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use parking_lot::RwLock;
24use tokio::fs;
25use tokio::io::AsyncReadExt;
26use tokio::io::AsyncWriteExt;
27use tracing::debug;
28use tracing::info;
29use tracing::warn;
30
31use crate::error::ControllerError;
32use crate::error::Result;
33use crate::storage::StorageBackend;
34use crate::storage::StorageStats;
35
36/// File-based storage backend
37///
38/// Provides persistent storage using individual files for each key.
39/// This is simpler than RocksDB but less performant for large datasets.
40///
41/// File structure:
42/// ```text
43///  data_dir/
44///    ├── metadata.json (index of all keys)
45///    └── data/
46///         ├── <key_hash_1>.dat
47///         ├── <key_hash_2>.dat
48///         └── ...
49/// ```
50pub struct FileBackend {
51    /// Data directory path
52    path: PathBuf,
53
54    /// In-memory index: key -> file path
55    index: Arc<RwLock<HashMap<String, PathBuf>>>,
56}
57
58impl FileBackend {
59    /// Create a new file-based backend
60    pub async fn new(path: PathBuf) -> Result<Self> {
61        info!("Opening file-based storage at {:?}", path);
62
63        // Create directories
64        fs::create_dir_all(&path).await.map_err(|e| {
65            ControllerError::StorageError(format!("Failed to create directory: {}", e))
66        })?;
67
68        let data_dir = path.join("data");
69        fs::create_dir_all(&data_dir).await.map_err(|e| {
70            ControllerError::StorageError(format!("Failed to create data directory: {}", e))
71        })?;
72
73        let backend = Self {
74            path,
75            index: Arc::new(RwLock::new(HashMap::new())),
76        };
77
78        // Load existing index
79        backend.load_index().await?;
80
81        info!("File-based storage opened successfully");
82
83        Ok(backend)
84    }
85
86    /// Get the storage path
87    pub fn path(&self) -> &PathBuf {
88        &self.path
89    }
90
91    /// Load index from metadata file
92    async fn load_index(&self) -> Result<()> {
93        let metadata_path = self.path.join("metadata.json");
94
95        if !metadata_path.exists() {
96            info!("No existing index found, starting fresh");
97            return Ok(());
98        }
99
100        let content = fs::read(&metadata_path).await.map_err(|e| {
101            ControllerError::StorageError(format!("Failed to read metadata: {}", e))
102        })?;
103
104        let loaded_index: HashMap<String, PathBuf> =
105            serde_json::from_slice(&content).map_err(|e| {
106                ControllerError::SerializationError(format!("Failed to parse metadata: {}", e))
107            })?;
108
109        *self.index.write() = loaded_index;
110
111        info!("Loaded index with {} keys", self.index.read().len());
112
113        Ok(())
114    }
115
116    /// Save index to metadata file
117    async fn save_index(&self) -> Result<()> {
118        let metadata_path = self.path.join("metadata.json");
119
120        let index = self.index.read().clone();
121        let content = serde_json::to_vec_pretty(&index).map_err(|e| {
122            ControllerError::SerializationError(format!("Failed to serialize metadata: {}", e))
123        })?;
124
125        fs::write(&metadata_path, content).await.map_err(|e| {
126            ControllerError::StorageError(format!("Failed to write metadata: {}", e))
127        })?;
128
129        Ok(())
130    }
131
132    /// Get file path for a key
133    fn get_file_path(&self, key: &str) -> PathBuf {
134        // Use hash to avoid file system issues with special characters
135        let hash = Self::hash_key(key);
136        self.path.join("data").join(format!("{}.dat", hash))
137    }
138
139    /// Hash a key to generate a filename
140    fn hash_key(key: &str) -> String {
141        // Simple hash function - use the key itself if safe, otherwise hash it
142        if key
143            .chars()
144            .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
145        {
146            key.to_string()
147        } else {
148            // Use a simple hash for keys with special characters
149            format!("{:x}", Self::simple_hash(key))
150        }
151    }
152
153    /// Simple hash function
154    fn simple_hash(s: &str) -> u64 {
155        let mut hash = 0u64;
156        for byte in s.bytes() {
157            hash = hash.wrapping_mul(31).wrapping_add(byte as u64);
158        }
159        hash
160    }
161}
162
163#[async_trait]
164impl StorageBackend for FileBackend {
165    async fn put(&self, key: &str, value: &[u8]) -> Result<()> {
166        debug!("File put: key={}, size={}", key, value.len());
167
168        let file_path = self.get_file_path(key);
169
170        // Write data to file
171        let mut file = fs::File::create(&file_path)
172            .await
173            .map_err(|e| ControllerError::StorageError(format!("Failed to create file: {}", e)))?;
174
175        file.write_all(value)
176            .await
177            .map_err(|e| ControllerError::StorageError(format!("Failed to write file: {}", e)))?;
178
179        file.sync_all()
180            .await
181            .map_err(|e| ControllerError::StorageError(format!("Failed to sync file: {}", e)))?;
182
183        // Update index
184        self.index.write().insert(key.to_string(), file_path);
185
186        // Save index periodically (every 10 operations)
187        if self.index.read().len() % 10 == 0 {
188            self.save_index().await?;
189        }
190
191        Ok(())
192    }
193
194    async fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
195        debug!("File get: key={}", key);
196
197        let file_path = match self.index.read().get(key) {
198            Some(path) => path.clone(),
199            None => return Ok(None),
200        };
201
202        // Check if file exists
203        if !file_path.exists() {
204            warn!("File not found for key: {}", key);
205            self.index.write().remove(key);
206            return Ok(None);
207        }
208
209        // Read file
210        let mut file = fs::File::open(&file_path)
211            .await
212            .map_err(|e| ControllerError::StorageError(format!("Failed to open file: {}", e)))?;
213
214        let mut buffer = Vec::new();
215        file.read_to_end(&mut buffer)
216            .await
217            .map_err(|e| ControllerError::StorageError(format!("Failed to read file: {}", e)))?;
218
219        Ok(Some(buffer))
220    }
221
222    async fn delete(&self, key: &str) -> Result<()> {
223        debug!("File delete: key={}", key);
224
225        let file_path = match self.index.write().remove(key) {
226            Some(path) => path,
227            None => return Ok(()), // Key doesn't exist
228        };
229
230        // Delete file
231        if file_path.exists() {
232            fs::remove_file(&file_path).await.map_err(|e| {
233                ControllerError::StorageError(format!("Failed to delete file: {}", e))
234            })?;
235        }
236
237        // Save index
238        self.save_index().await?;
239
240        Ok(())
241    }
242
243    async fn list_keys(&self, prefix: &str) -> Result<Vec<String>> {
244        debug!("File list_keys: prefix={}", prefix);
245
246        let keys: Vec<String> = self
247            .index
248            .read()
249            .keys()
250            .filter(|k| k.starts_with(prefix))
251            .cloned()
252            .collect();
253
254        Ok(keys)
255    }
256
257    async fn batch_put(&self, items: Vec<(String, Vec<u8>)>) -> Result<()> {
258        debug!("File batch_put: {} items", items.len());
259
260        for (key, value) in items {
261            self.put(&key, &value).await?;
262        }
263
264        // Save index after batch
265        self.save_index().await?;
266
267        Ok(())
268    }
269
270    async fn batch_delete(&self, keys: Vec<String>) -> Result<()> {
271        debug!("File batch_delete: {} keys", keys.len());
272
273        for key in keys {
274            self.delete(&key).await?;
275        }
276
277        Ok(())
278    }
279
280    async fn exists(&self, key: &str) -> Result<bool> {
281        debug!("File exists: key={}", key);
282        Ok(self.index.read().contains_key(key))
283    }
284
285    async fn clear(&self) -> Result<()> {
286        info!("File clear: removing all data");
287
288        // Delete all files
289        let data_dir = self.path.join("data");
290        if data_dir.exists() {
291            fs::remove_dir_all(&data_dir).await.map_err(|e| {
292                ControllerError::StorageError(format!("Failed to clear data: {}", e))
293            })?;
294
295            fs::create_dir_all(&data_dir).await.map_err(|e| {
296                ControllerError::StorageError(format!("Failed to recreate data directory: {}", e))
297            })?;
298        }
299
300        // Clear index
301        self.index.write().clear();
302        self.save_index().await?;
303
304        Ok(())
305    }
306
307    async fn sync(&self) -> Result<()> {
308        debug!("File sync");
309        self.save_index().await
310    }
311
312    async fn stats(&self) -> Result<StorageStats> {
313        debug!("File stats");
314
315        // Clone the paths to avoid holding the lock across await
316        let paths: Vec<PathBuf> = {
317            let index = self.index.read();
318            index.values().cloned().collect()
319        };
320
321        let key_count = paths.len();
322
323        // Calculate total size
324        let mut total_size = 0u64;
325        for path in paths {
326            if path.exists() {
327                if let Ok(metadata) = fs::metadata(&path).await {
328                    total_size += metadata.len();
329                }
330            }
331        }
332
333        Ok(StorageStats {
334            key_count,
335            total_size,
336            backend_info: format!("File-based storage at {:?}", self.path),
337        })
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use tempfile::TempDir;
344
345    use super::*;
346
347    #[tokio::test]
348    async fn test_file_backend() {
349        let temp_dir = TempDir::new().unwrap();
350        let storage_path = temp_dir.path().join("file_storage");
351
352        let backend = FileBackend::new(storage_path).await.unwrap();
353
354        // Test put and get
355        backend.put("test_key", b"test_value").await.unwrap();
356        let value = backend.get("test_key").await.unwrap();
357        assert_eq!(value, Some(b"test_value".to_vec()));
358
359        // Test exists
360        assert!(backend.exists("test_key").await.unwrap());
361        assert!(!backend.exists("nonexistent").await.unwrap());
362
363        // Test delete
364        backend.delete("test_key").await.unwrap();
365        assert!(!backend.exists("test_key").await.unwrap());
366
367        // Test batch operations
368        let items = vec![
369            ("batch_1".to_string(), b"value1".to_vec()),
370            ("batch_2".to_string(), b"value2".to_vec()),
371        ];
372        backend.batch_put(items).await.unwrap();
373
374        assert!(backend.exists("batch_1").await.unwrap());
375        assert!(backend.exists("batch_2").await.unwrap());
376
377        // Test list_keys
378        backend.put("prefix_1", b"value1").await.unwrap();
379        backend.put("prefix_2", b"value2").await.unwrap();
380
381        let keys = backend.list_keys("prefix_").await.unwrap();
382        assert_eq!(keys.len(), 2);
383
384        // Test stats
385        let stats = backend.stats().await.unwrap();
386        assert!(stats.key_count >= 4);
387
388        // Test sync
389        backend.sync().await.unwrap();
390    }
391
392    #[tokio::test]
393    async fn test_file_backend_persistence() {
394        let temp_dir = TempDir::new().unwrap();
395        let storage_path = temp_dir.path().join("persistent_storage");
396
397        // First session: write data
398        {
399            let backend = FileBackend::new(storage_path.clone()).await.unwrap();
400            backend.put("persist_key", b"persist_value").await.unwrap();
401            backend.sync().await.unwrap();
402        }
403
404        // Second session: read data
405        {
406            let backend = FileBackend::new(storage_path).await.unwrap();
407            let value = backend.get("persist_key").await.unwrap();
408            assert_eq!(value, Some(b"persist_value".to_vec()));
409        }
410    }
411}