Skip to main content

sh_layer1/
storage_engine.rs

1//! 存储引擎模块
2//!
3//! 统一的存储抽象,支持文件、对象存储等。
4//!
5//! **功能状态:**
6//! - `[STABLE]` FileSystem 存储 - 已完整实现
7//! - `[STABLE]` Memory 存储 - 内存 HashMap 存储
8//! - `[EXPERIMENTAL]` S3 存储 - 尚未实现
9
10use anyhow::Result;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::path::PathBuf;
15
16/// 存储配置
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct StorageConfig {
19    /// 存储类型
20    pub storage_type: StorageType,
21    /// 基础路径
22    pub base_path: String,
23    /// 最大文件大小(字节)
24    pub max_file_size: u64,
25}
26
27impl Default for StorageConfig {
28    fn default() -> Self {
29        Self {
30            storage_type: StorageType::FileSystem,
31            base_path: "./data".to_string(),
32            max_file_size: 100 * 1024 * 1024, // 100MB
33        }
34    }
35}
36
37/// 存储类型
38///
39/// - `FileSystem` - [STABLE] 本地文件系统存储
40/// - `Memory` - [STABLE] 内存 HashMap 存储
41/// - `S3` - [EXPERIMENTAL] S3 对象存储,尚未实现
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub enum StorageType {
44    FileSystem,
45    Memory,
46    S3,
47}
48
49/// 存储引擎
50pub struct StorageEngine {
51    config: StorageConfig,
52    memory: RwLock<HashMap<String, Vec<u8>>>,
53}
54
55impl StorageEngine {
56    pub fn new(config: StorageConfig) -> Self {
57        Self {
58            config,
59            memory: RwLock::new(HashMap::new()),
60        }
61    }
62
63    /// 读取数据
64    pub async fn read(&self, key: &str) -> Result<Vec<u8>> {
65        match self.config.storage_type {
66            StorageType::FileSystem => {
67                let path = PathBuf::from(&self.config.base_path).join(key);
68                let data = tokio::fs::read(&path).await?;
69                Ok(data)
70            }
71            StorageType::Memory => self
72                .memory
73                .read()
74                .get(key)
75                .cloned()
76                .ok_or_else(|| anyhow::anyhow!("Memory storage key not found: {}", key)),
77            StorageType::S3 => Err(Self::s3_unimplemented("read", key)),
78        }
79    }
80
81    /// 写入数据
82    pub async fn write(&self, key: &str, data: &[u8]) -> Result<()> {
83        match self.config.storage_type {
84            StorageType::FileSystem => {
85                let path = PathBuf::from(&self.config.base_path).join(key);
86
87                // 确保目录存在
88                if let Some(parent) = path.parent() {
89                    tokio::fs::create_dir_all(parent).await?;
90                }
91
92                tokio::fs::write(&path, data).await?;
93                Ok(())
94            }
95            StorageType::Memory => {
96                self.memory.write().insert(key.to_string(), data.to_vec());
97                Ok(())
98            }
99            StorageType::S3 => Err(Self::s3_unimplemented("write", key)),
100        }
101    }
102
103    /// 删除数据
104    pub async fn delete(&self, key: &str) -> Result<()> {
105        match self.config.storage_type {
106            StorageType::FileSystem => {
107                let path = PathBuf::from(&self.config.base_path).join(key);
108                tokio::fs::remove_file(&path).await?;
109                Ok(())
110            }
111            StorageType::Memory => {
112                self.memory.write().remove(key);
113                Ok(())
114            }
115            StorageType::S3 => Err(Self::s3_unimplemented("delete", key)),
116        }
117    }
118
119    /// 检查是否存在
120    pub async fn exists(&self, key: &str) -> Result<bool> {
121        match self.config.storage_type {
122            StorageType::FileSystem => {
123                let path = PathBuf::from(&self.config.base_path).join(key);
124                Ok(path.exists())
125            }
126            StorageType::Memory => Ok(self.memory.read().contains_key(key)),
127            StorageType::S3 => Err(Self::s3_unimplemented("exists", key)),
128        }
129    }
130
131    /// 列出所有键
132    pub async fn list(&self, prefix: &str) -> Result<Vec<String>> {
133        match self.config.storage_type {
134            StorageType::FileSystem => {
135                let path = PathBuf::from(&self.config.base_path).join(prefix);
136                let mut entries = Vec::new();
137
138                if path.is_dir() {
139                    let mut dir = tokio::fs::read_dir(&path).await?;
140                    while let Some(entry) = dir.next_entry().await? {
141                        if let Some(name) = entry.file_name().to_str() {
142                            entries.push(name.to_string());
143                        }
144                    }
145                }
146
147                Ok(entries)
148            }
149            StorageType::Memory => Ok(self
150                .memory
151                .read()
152                .keys()
153                .filter(|key| key.starts_with(prefix))
154                .cloned()
155                .collect()),
156            StorageType::S3 => Err(Self::s3_unimplemented("list", prefix)),
157        }
158    }
159
160    fn s3_unimplemented(operation: &str, key: &str) -> anyhow::Error {
161        anyhow::anyhow!(
162            "[experimental] S3 storage operation '{}' is not yet implemented (key: {})",
163            operation,
164            key
165        )
166    }
167}
168
169impl Default for StorageEngine {
170    fn default() -> Self {
171        Self::new(StorageConfig::default())
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178    use tempfile::TempDir;
179
180    #[test]
181    fn test_storage_config_default() {
182        let config = StorageConfig::default();
183        assert!(matches!(config.storage_type, StorageType::FileSystem));
184        assert_eq!(config.base_path, "./data");
185    }
186
187    #[test]
188    fn test_storage_type_filesystem() {
189        let config = StorageConfig {
190            storage_type: StorageType::FileSystem,
191            ..Default::default()
192        };
193        assert!(matches!(config.storage_type, StorageType::FileSystem));
194    }
195
196    #[tokio::test]
197    async fn test_filesystem_write_and_read() {
198        let dir = TempDir::new().unwrap();
199        let config = StorageConfig {
200            storage_type: StorageType::FileSystem,
201            base_path: dir.path().to_str().unwrap().to_string(),
202            max_file_size: 1024 * 1024,
203        };
204        let engine = StorageEngine::new(config);
205
206        engine.write("test.txt", b"hello world").await.unwrap();
207        let data = engine.read("test.txt").await.unwrap();
208        assert_eq!(data, b"hello world");
209    }
210
211    #[tokio::test]
212    async fn test_filesystem_exists() {
213        let dir = TempDir::new().unwrap();
214        let config = StorageConfig {
215            storage_type: StorageType::FileSystem,
216            base_path: dir.path().to_str().unwrap().to_string(),
217            max_file_size: 1024 * 1024,
218        };
219        let engine = StorageEngine::new(config);
220
221        assert!(!engine.exists("test.txt").await.unwrap());
222        engine.write("test.txt", b"hello").await.unwrap();
223        assert!(engine.exists("test.txt").await.unwrap());
224    }
225
226    #[tokio::test]
227    async fn test_filesystem_delete() {
228        let dir = TempDir::new().unwrap();
229        let config = StorageConfig {
230            storage_type: StorageType::FileSystem,
231            base_path: dir.path().to_str().unwrap().to_string(),
232            max_file_size: 1024 * 1024,
233        };
234        let engine = StorageEngine::new(config);
235
236        engine.write("test.txt", b"hello").await.unwrap();
237        engine.delete("test.txt").await.unwrap();
238        assert!(!engine.exists("test.txt").await.unwrap());
239    }
240
241    #[tokio::test]
242    async fn test_memory_storage_write_read_list_delete() {
243        let config = StorageConfig {
244            storage_type: StorageType::Memory,
245            ..Default::default()
246        };
247        let engine = StorageEngine::new(config);
248
249        engine.write("test/a.txt", b"hello").await.unwrap();
250        engine.write("other.txt", b"world").await.unwrap();
251
252        assert_eq!(engine.read("test/a.txt").await.unwrap(), b"hello");
253        assert!(engine.exists("test/a.txt").await.unwrap());
254        assert_eq!(
255            engine.list("test/").await.unwrap(),
256            vec!["test/a.txt".to_string()]
257        );
258
259        engine.delete("test/a.txt").await.unwrap();
260        assert!(!engine.exists("test/a.txt").await.unwrap());
261    }
262
263    #[tokio::test]
264    async fn test_s3_storage_not_implemented() {
265        let config = StorageConfig {
266            storage_type: StorageType::S3,
267            ..Default::default()
268        };
269        let engine = StorageEngine::new(config);
270
271        let result = engine.write("test", b"data").await;
272        assert!(result.is_err());
273        let error = result.unwrap_err().to_string();
274        assert!(error.contains("[experimental]"));
275        assert!(error.contains("S3 storage operation 'write'"));
276    }
277}