sh_layer1/
storage_engine.rs1use anyhow::Result;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::path::PathBuf;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct StorageConfig {
19 pub storage_type: StorageType,
21 pub base_path: String,
23 pub max_file_size: u64,
25}
26
27impl Default for StorageConfig {
28 fn default() -> Self {
29 Self {
30 storage_type: StorageType::FileSystem,
31 base_path: "./data".to_string(),
32 max_file_size: 100 * 1024 * 1024, }
34 }
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
43pub enum StorageType {
44 FileSystem,
45 Memory,
46 S3,
47}
48
49pub struct StorageEngine {
51 config: StorageConfig,
52 memory: RwLock<HashMap<String, Vec<u8>>>,
53}
54
55impl StorageEngine {
56 pub fn new(config: StorageConfig) -> Self {
57 Self {
58 config,
59 memory: RwLock::new(HashMap::new()),
60 }
61 }
62
63 pub async fn read(&self, key: &str) -> Result<Vec<u8>> {
65 match self.config.storage_type {
66 StorageType::FileSystem => {
67 let path = PathBuf::from(&self.config.base_path).join(key);
68 let data = tokio::fs::read(&path).await?;
69 Ok(data)
70 }
71 StorageType::Memory => self
72 .memory
73 .read()
74 .get(key)
75 .cloned()
76 .ok_or_else(|| anyhow::anyhow!("Memory storage key not found: {}", key)),
77 StorageType::S3 => Err(Self::s3_unimplemented("read", key)),
78 }
79 }
80
81 pub async fn write(&self, key: &str, data: &[u8]) -> Result<()> {
83 match self.config.storage_type {
84 StorageType::FileSystem => {
85 let path = PathBuf::from(&self.config.base_path).join(key);
86
87 if let Some(parent) = path.parent() {
89 tokio::fs::create_dir_all(parent).await?;
90 }
91
92 tokio::fs::write(&path, data).await?;
93 Ok(())
94 }
95 StorageType::Memory => {
96 self.memory.write().insert(key.to_string(), data.to_vec());
97 Ok(())
98 }
99 StorageType::S3 => Err(Self::s3_unimplemented("write", key)),
100 }
101 }
102
103 pub async fn delete(&self, key: &str) -> Result<()> {
105 match self.config.storage_type {
106 StorageType::FileSystem => {
107 let path = PathBuf::from(&self.config.base_path).join(key);
108 tokio::fs::remove_file(&path).await?;
109 Ok(())
110 }
111 StorageType::Memory => {
112 self.memory.write().remove(key);
113 Ok(())
114 }
115 StorageType::S3 => Err(Self::s3_unimplemented("delete", key)),
116 }
117 }
118
119 pub async fn exists(&self, key: &str) -> Result<bool> {
121 match self.config.storage_type {
122 StorageType::FileSystem => {
123 let path = PathBuf::from(&self.config.base_path).join(key);
124 Ok(path.exists())
125 }
126 StorageType::Memory => Ok(self.memory.read().contains_key(key)),
127 StorageType::S3 => Err(Self::s3_unimplemented("exists", key)),
128 }
129 }
130
131 pub async fn list(&self, prefix: &str) -> Result<Vec<String>> {
133 match self.config.storage_type {
134 StorageType::FileSystem => {
135 let path = PathBuf::from(&self.config.base_path).join(prefix);
136 let mut entries = Vec::new();
137
138 if path.is_dir() {
139 let mut dir = tokio::fs::read_dir(&path).await?;
140 while let Some(entry) = dir.next_entry().await? {
141 if let Some(name) = entry.file_name().to_str() {
142 entries.push(name.to_string());
143 }
144 }
145 }
146
147 Ok(entries)
148 }
149 StorageType::Memory => Ok(self
150 .memory
151 .read()
152 .keys()
153 .filter(|key| key.starts_with(prefix))
154 .cloned()
155 .collect()),
156 StorageType::S3 => Err(Self::s3_unimplemented("list", prefix)),
157 }
158 }
159
160 fn s3_unimplemented(operation: &str, key: &str) -> anyhow::Error {
161 anyhow::anyhow!(
162 "[experimental] S3 storage operation '{}' is not yet implemented (key: {})",
163 operation,
164 key
165 )
166 }
167}
168
169impl Default for StorageEngine {
170 fn default() -> Self {
171 Self::new(StorageConfig::default())
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178 use tempfile::TempDir;
179
180 #[test]
181 fn test_storage_config_default() {
182 let config = StorageConfig::default();
183 assert!(matches!(config.storage_type, StorageType::FileSystem));
184 assert_eq!(config.base_path, "./data");
185 }
186
187 #[test]
188 fn test_storage_type_filesystem() {
189 let config = StorageConfig {
190 storage_type: StorageType::FileSystem,
191 ..Default::default()
192 };
193 assert!(matches!(config.storage_type, StorageType::FileSystem));
194 }
195
196 #[tokio::test]
197 async fn test_filesystem_write_and_read() {
198 let dir = TempDir::new().unwrap();
199 let config = StorageConfig {
200 storage_type: StorageType::FileSystem,
201 base_path: dir.path().to_str().unwrap().to_string(),
202 max_file_size: 1024 * 1024,
203 };
204 let engine = StorageEngine::new(config);
205
206 engine.write("test.txt", b"hello world").await.unwrap();
207 let data = engine.read("test.txt").await.unwrap();
208 assert_eq!(data, b"hello world");
209 }
210
211 #[tokio::test]
212 async fn test_filesystem_exists() {
213 let dir = TempDir::new().unwrap();
214 let config = StorageConfig {
215 storage_type: StorageType::FileSystem,
216 base_path: dir.path().to_str().unwrap().to_string(),
217 max_file_size: 1024 * 1024,
218 };
219 let engine = StorageEngine::new(config);
220
221 assert!(!engine.exists("test.txt").await.unwrap());
222 engine.write("test.txt", b"hello").await.unwrap();
223 assert!(engine.exists("test.txt").await.unwrap());
224 }
225
226 #[tokio::test]
227 async fn test_filesystem_delete() {
228 let dir = TempDir::new().unwrap();
229 let config = StorageConfig {
230 storage_type: StorageType::FileSystem,
231 base_path: dir.path().to_str().unwrap().to_string(),
232 max_file_size: 1024 * 1024,
233 };
234 let engine = StorageEngine::new(config);
235
236 engine.write("test.txt", b"hello").await.unwrap();
237 engine.delete("test.txt").await.unwrap();
238 assert!(!engine.exists("test.txt").await.unwrap());
239 }
240
241 #[tokio::test]
242 async fn test_memory_storage_write_read_list_delete() {
243 let config = StorageConfig {
244 storage_type: StorageType::Memory,
245 ..Default::default()
246 };
247 let engine = StorageEngine::new(config);
248
249 engine.write("test/a.txt", b"hello").await.unwrap();
250 engine.write("other.txt", b"world").await.unwrap();
251
252 assert_eq!(engine.read("test/a.txt").await.unwrap(), b"hello");
253 assert!(engine.exists("test/a.txt").await.unwrap());
254 assert_eq!(
255 engine.list("test/").await.unwrap(),
256 vec!["test/a.txt".to_string()]
257 );
258
259 engine.delete("test/a.txt").await.unwrap();
260 assert!(!engine.exists("test/a.txt").await.unwrap());
261 }
262
263 #[tokio::test]
264 async fn test_s3_storage_not_implemented() {
265 let config = StorageConfig {
266 storage_type: StorageType::S3,
267 ..Default::default()
268 };
269 let engine = StorageEngine::new(config);
270
271 let result = engine.write("test", b"data").await;
272 assert!(result.is_err());
273 let error = result.unwrap_err().to_string();
274 assert!(error.contains("[experimental]"));
275 assert!(error.contains("S3 storage operation 'write'"));
276 }
277}