mappy_core/storage/
aof.rs1use super::{Storage, StorageStats, StorageConfig};
6use crate::{MapletError, MapletResult};
7use dashmap::DashMap;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use std::time::Instant;
11use std::path::Path;
12use std::fs::OpenOptions;
13use std::io::{BufReader, Write, BufRead};
14use serde_json;
15use async_trait::async_trait;
16use tokio::time::{interval, Duration};
17
18pub struct AOFStorage {
20 cache: Arc<DashMap<String, Vec<u8>>>,
22 aof_path: std::path::PathBuf,
24 config: StorageConfig,
26 stats: Arc<RwLock<StorageStats>>,
28 #[allow(dead_code)]
30 start_time: Instant,
31 sync_handle: Option<tokio::task::JoinHandle<()>>,
33}
34
35#[derive(serde::Serialize, serde::Deserialize)]
36enum AOFEntry {
37 Set { key: String, value: Vec<u8> },
38 Delete { key: String },
39}
40
41impl AOFStorage {
42 pub fn new(config: StorageConfig) -> MapletResult<Self> {
44 std::fs::create_dir_all(&config.data_dir)
46 .map_err(|e| MapletError::Internal(format!("Failed to create data directory: {}", e)))?;
47
48 let aof_path = Path::new(&config.data_dir).join("mappy.aof");
49
50 let mut storage = Self {
51 cache: Arc::new(DashMap::new()),
52 aof_path: aof_path.clone(),
53 config,
54 stats: Arc::new(RwLock::new(StorageStats::default())),
55 start_time: Instant::now(),
56 sync_handle: None,
57 };
58
59 storage.load_from_aof()?;
61
62 storage.start_sync_task();
64
65 Ok(storage)
66 }
67
68 fn load_from_aof(&self) -> MapletResult<()> {
70 if !self.aof_path.exists() {
71 return Ok(());
72 }
73
74 let file = std::fs::File::open(&self.aof_path)
75 .map_err(|e| MapletError::Internal(format!("Failed to open AOF file: {}", e)))?;
76
77 let reader = BufReader::new(file);
78
79 for line in reader.lines() {
80 let line = line.map_err(|e| MapletError::Internal(format!("Failed to read AOF line: {}", e)))?;
81 if line.trim().is_empty() {
82 continue;
83 }
84
85 let entry: AOFEntry = serde_json::from_str(&line)
86 .map_err(|e| MapletError::Internal(format!("Failed to parse AOF entry: {}", e)))?;
87
88 match entry {
89 AOFEntry::Set { key, value } => {
90 self.cache.insert(key, value);
91 }
92 AOFEntry::Delete { key } => {
93 self.cache.remove(&key);
94 }
95 }
96 }
97
98 Ok(())
99 }
100
101 fn append_to_aof(&self, entry: AOFEntry) -> MapletResult<()> {
103 let line = serde_json::to_string(&entry)
104 .map_err(|e| MapletError::Internal(format!("Failed to serialize AOF entry: {}", e)))?;
105
106 let mut file = OpenOptions::new()
107 .create(true)
108 .append(true)
109 .open(&self.aof_path)
110 .map_err(|e| MapletError::Internal(format!("Failed to open AOF file for append: {}", e)))?;
111
112 writeln!(file, "{}", line)
113 .map_err(|e| MapletError::Internal(format!("Failed to write to AOF file: {}", e)))?;
114
115 Ok(())
116 }
117
118 fn start_sync_task(&mut self) {
120 let _cache = Arc::clone(&self.cache);
121 let aof_path = self.aof_path.clone();
122 let sync_interval = Duration::from_secs(self.config.sync_interval);
123
124 let handle = tokio::spawn(async move {
125 let mut interval = interval(sync_interval);
126
127 loop {
128 interval.tick().await;
129
130 if let Err(e) = std::fs::File::open(&aof_path)
132 .and_then(|f| f.sync_all())
133 {
134 eprintln!("Failed to sync AOF file: {}", e);
135 }
136 }
137 });
138
139 self.sync_handle = Some(handle);
140 }
141
142 async fn update_stats<F>(&self, f: F)
144 where
145 F: FnOnce(&mut StorageStats),
146 {
147 let mut stats = self.stats.write().await;
148 f(&mut stats);
149 }
150
151 fn calculate_memory_usage(&self) -> u64 {
153 let mut total = 0;
154 for entry in self.cache.iter() {
155 total += entry.key().len() + entry.value().len();
156 }
157 total as u64
158 }
159
160 fn calculate_disk_usage(&self) -> u64 {
162 if self.aof_path.exists() {
163 std::fs::metadata(&self.aof_path)
164 .map(|m| m.len())
165 .unwrap_or(0)
166 } else {
167 0
168 }
169 }
170}
171
172#[async_trait]
173impl Storage for AOFStorage {
174 async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
175 let start = Instant::now();
176 let result = self.cache.get(key).map(|entry| entry.value().clone());
177 let latency = start.elapsed().as_micros() as u64;
178
179 self.update_stats(|stats| {
180 stats.operations_count += 1;
181 stats.avg_latency_us = (stats.avg_latency_us + latency) / 2;
182 }).await;
183
184 Ok(result)
185 }
186
187 async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
188 let start = Instant::now();
189
190 self.cache.insert(key.clone(), value.clone());
192
193 let entry = AOFEntry::Set { key, value };
195 self.append_to_aof(entry)?;
196
197 let latency = start.elapsed().as_micros() as u64;
198
199 self.update_stats(|stats| {
200 stats.operations_count += 1;
201 stats.avg_latency_us = (stats.avg_latency_us + latency) / 2;
202 stats.total_keys = self.cache.len() as u64;
203 stats.memory_usage = self.calculate_memory_usage();
204 stats.disk_usage = self.calculate_disk_usage();
205 }).await;
206
207 Ok(())
208 }
209
210 async fn delete(&self, key: &str) -> MapletResult<bool> {
211 let start = Instant::now();
212 let existed = self.cache.remove(key).is_some();
213
214 if existed {
215 let entry = AOFEntry::Delete { key: key.to_string() };
217 self.append_to_aof(entry)?;
218 }
219
220 let latency = start.elapsed().as_micros() as u64;
221
222 self.update_stats(|stats| {
223 stats.operations_count += 1;
224 stats.avg_latency_us = (stats.avg_latency_us + latency) / 2;
225 stats.total_keys = self.cache.len() as u64;
226 stats.memory_usage = self.calculate_memory_usage();
227 stats.disk_usage = self.calculate_disk_usage();
228 }).await;
229
230 Ok(existed)
231 }
232
233 async fn exists(&self, key: &str) -> MapletResult<bool> {
234 let start = Instant::now();
235 let exists = self.cache.contains_key(key);
236 let latency = start.elapsed().as_micros() as u64;
237
238 self.update_stats(|stats| {
239 stats.operations_count += 1;
240 stats.avg_latency_us = (stats.avg_latency_us + latency) / 2;
241 }).await;
242
243 Ok(exists)
244 }
245
246 async fn keys(&self) -> MapletResult<Vec<String>> {
247 let start = Instant::now();
248 let keys: Vec<String> = self.cache.iter().map(|entry| entry.key().clone()).collect();
249 let latency = start.elapsed().as_micros() as u64;
250
251 self.update_stats(|stats| {
252 stats.operations_count += 1;
253 stats.avg_latency_us = (stats.avg_latency_us + latency) / 2;
254 }).await;
255
256 Ok(keys)
257 }
258
259 async fn clear_database(&self) -> MapletResult<()> {
260 let start = Instant::now();
261 self.cache.clear();
262
263 std::fs::write(&self.aof_path, "")
265 .map_err(|e| MapletError::Internal(format!("Failed to clear AOF file: {}", e)))?;
266
267 let latency = start.elapsed().as_micros() as u64;
268
269 self.update_stats(|stats| {
270 stats.operations_count += 1;
271 stats.avg_latency_us = (stats.avg_latency_us + latency) / 2;
272 stats.total_keys = 0;
273 stats.memory_usage = 0;
274 stats.disk_usage = 0;
275 }).await;
276
277 Ok(())
278 }
279
280 async fn flush(&self) -> MapletResult<()> {
281 let start = Instant::now();
282
283 std::fs::File::open(&self.aof_path)
285 .and_then(|f| f.sync_all())
286 .map_err(|e| MapletError::Internal(format!("Failed to sync AOF file: {}", e)))?;
287
288 let latency = start.elapsed().as_micros() as u64;
289
290 self.update_stats(|stats| {
291 stats.operations_count += 1;
292 stats.avg_latency_us = (stats.avg_latency_us + latency) / 2;
293 }).await;
294
295 Ok(())
296 }
297
298 async fn close(&self) -> MapletResult<()> {
299 if let Some(handle) = &self.sync_handle {
301 handle.abort();
302 }
303
304 self.flush().await?;
306
307 Ok(())
308 }
309
310 async fn stats(&self) -> MapletResult<StorageStats> {
311 let mut stats = self.stats.read().await.clone();
312 stats.total_keys = self.cache.len() as u64;
313 stats.memory_usage = self.calculate_memory_usage();
314 stats.disk_usage = self.calculate_disk_usage();
315 Ok(stats)
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322 use tempfile::TempDir;
323
324 #[tokio::test]
325 async fn test_aof_storage_basic_operations() {
326 let temp_dir = TempDir::new().unwrap();
327 let config = StorageConfig {
328 data_dir: temp_dir.path().to_string_lossy().to_string(),
329 sync_interval: 1,
330 ..Default::default()
331 };
332 let storage = AOFStorage::new(config).unwrap();
333
334 storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
336 let value = storage.get("key1").await.unwrap();
337 assert_eq!(value, Some(b"value1".to_vec()));
338
339 assert!(storage.exists("key1").await.unwrap());
341 assert!(!storage.exists("key2").await.unwrap());
342
343 let deleted = storage.delete("key1").await.unwrap();
345 assert!(deleted);
346 assert!(!storage.exists("key1").await.unwrap());
347 }
348
349 #[tokio::test]
350 async fn test_aof_storage_persistence() {
351 let temp_dir = TempDir::new().unwrap();
352 let config = StorageConfig {
353 data_dir: temp_dir.path().to_string_lossy().to_string(),
354 sync_interval: 1,
355 ..Default::default()
356 };
357
358 {
360 let storage = AOFStorage::new(config.clone()).unwrap();
361 storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
362 storage.flush().await.unwrap();
363 }
364
365 {
367 let storage = AOFStorage::new(config).unwrap();
368 let value = storage.get("key1").await.unwrap();
369 assert_eq!(value, Some(b"value1".to_vec()));
370 }
371 }
372
373 #[tokio::test]
374 async fn test_aof_storage_stats() {
375 let temp_dir = TempDir::new().unwrap();
376 let config = StorageConfig {
377 data_dir: temp_dir.path().to_string_lossy().to_string(),
378 sync_interval: 1,
379 ..Default::default()
380 };
381 let storage = AOFStorage::new(config).unwrap();
382
383 storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
384
385 let stats = storage.stats().await.unwrap();
386 assert_eq!(stats.total_keys, 1);
387 assert!(stats.memory_usage > 0);
388 assert!(stats.disk_usage > 0);
389 assert!(stats.operations_count > 0);
390 }
391}