mappy_core/storage/
hybrid.rs1use super::{Storage, StorageStats, StorageConfig};
6use crate::{MapletError, MapletResult};
7use dashmap::DashMap;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use std::time::Instant;
11use std::path::Path;
12use std::fs::OpenOptions;
13use std::io::{BufReader, Write, BufRead};
14use serde_json;
15use async_trait::async_trait;
16use tokio::time::{interval, Duration};
17
18pub struct HybridStorage {
20 cache: Arc<DashMap<String, Vec<u8>>>,
22 aof_path: std::path::PathBuf,
24 config: StorageConfig,
26 stats: Arc<RwLock<StorageStats>>,
28 #[allow(dead_code)]
30 start_time: Instant,
31 sync_handle: Option<tokio::task::JoinHandle<()>>,
33 write_buffer: Arc<RwLock<Vec<AOFEntry>>>,
35}
36
37#[derive(serde::Serialize, serde::Deserialize, Clone)]
38enum AOFEntry {
39 Set { key: String, value: Vec<u8> },
40 Delete { key: String },
41}
42
43impl HybridStorage {
44 pub fn new(config: StorageConfig) -> MapletResult<Self> {
46 std::fs::create_dir_all(&config.data_dir)
48 .map_err(|e| MapletError::Internal(format!("Failed to create data directory: {}", e)))?;
49
50 let aof_path = Path::new(&config.data_dir).join("mappy.aof");
51
52 let mut storage = Self {
53 cache: Arc::new(DashMap::new()),
54 aof_path: aof_path.clone(),
55 config,
56 stats: Arc::new(RwLock::new(StorageStats::default())),
57 start_time: Instant::now(),
58 sync_handle: None,
59 write_buffer: Arc::new(RwLock::new(Vec::new())),
60 };
61
62 storage.load_from_aof()?;
64
65 storage.start_sync_task();
67
68 Ok(storage)
69 }
70
71 fn load_from_aof(&self) -> MapletResult<()> {
73 if !self.aof_path.exists() {
74 return Ok(());
75 }
76
77 let file = std::fs::File::open(&self.aof_path)
78 .map_err(|e| MapletError::Internal(format!("Failed to open AOF file: {}", e)))?;
79
80 let reader = BufReader::new(file);
81
82 for line in reader.lines() {
83 let line = line.map_err(|e| MapletError::Internal(format!("Failed to read AOF line: {}", e)))?;
84 if line.trim().is_empty() {
85 continue;
86 }
87
88 let entry: AOFEntry = serde_json::from_str(&line)
89 .map_err(|e| MapletError::Internal(format!("Failed to parse AOF entry: {}", e)))?;
90
91 match entry {
92 AOFEntry::Set { key, value } => {
93 self.cache.insert(key, value);
94 }
95 AOFEntry::Delete { key } => {
96 self.cache.remove(&key);
97 }
98 }
99 }
100
101 Ok(())
102 }
103
104 async fn add_to_buffer(&self, entry: AOFEntry) -> MapletResult<()> {
106 let mut buffer = self.write_buffer.write().await;
107 buffer.push(entry);
108
109 if buffer.len() >= self.config.write_buffer_size {
111 self.flush_buffer_internal(&mut buffer)?;
112 }
113
114 Ok(())
115 }
116
117 fn flush_buffer_internal(&self, buffer: &mut Vec<AOFEntry>) -> MapletResult<()> {
119 if buffer.is_empty() {
120 return Ok(());
121 }
122
123 let mut file = OpenOptions::new()
124 .create(true)
125 .append(true)
126 .open(&self.aof_path)
127 .map_err(|e| MapletError::Internal(format!("Failed to open AOF file for append: {}", e)))?;
128
129 for entry in buffer.iter() {
130 let line = serde_json::to_string(entry)
131 .map_err(|e| MapletError::Internal(format!("Failed to serialize AOF entry: {}", e)))?;
132 writeln!(file, "{}", line)
133 .map_err(|e| MapletError::Internal(format!("Failed to write to AOF file: {}", e)))?;
134 }
135
136 buffer.clear();
137 Ok(())
138 }
139
140 fn start_sync_task(&mut self) {
142 let write_buffer = Arc::clone(&self.write_buffer);
143 let aof_path = self.aof_path.clone();
144 let sync_interval = Duration::from_secs(self.config.sync_interval);
145
146 let handle = tokio::spawn(async move {
147 let mut interval = interval(sync_interval);
148
149 loop {
150 interval.tick().await;
151
152 {
154 let mut buffer = write_buffer.write().await;
155 if !buffer.is_empty() {
156 if let Err(e) = Self::flush_buffer_internal_static(&mut buffer, &aof_path) {
157 eprintln!("Failed to flush AOF buffer: {}", e);
158 }
159 }
160 }
161
162 if let Err(e) = std::fs::File::open(&aof_path)
164 .and_then(|f| f.sync_all())
165 {
166 eprintln!("Failed to sync AOF file: {}", e);
167 }
168 }
169 });
170
171 self.sync_handle = Some(handle);
172 }
173
174 fn flush_buffer_internal_static(
176 buffer: &mut Vec<AOFEntry>,
177 aof_path: &std::path::Path,
178 ) -> MapletResult<()> {
179 if buffer.is_empty() {
180 return Ok(());
181 }
182
183 let mut file = OpenOptions::new()
184 .create(true)
185 .append(true)
186 .open(aof_path)
187 .map_err(|e| MapletError::Internal(format!("Failed to open AOF file for append: {}", e)))?;
188
189 for entry in buffer.iter() {
190 let line = serde_json::to_string(entry)
191 .map_err(|e| MapletError::Internal(format!("Failed to serialize AOF entry: {}", e)))?;
192 writeln!(file, "{}", line)
193 .map_err(|e| MapletError::Internal(format!("Failed to write to AOF file: {}", e)))?;
194 }
195
196 buffer.clear();
197 Ok(())
198 }
199
200 async fn update_stats<F>(&self, f: F)
202 where
203 F: FnOnce(&mut StorageStats),
204 {
205 let mut stats = self.stats.write().await;
206 f(&mut stats);
207 }
208
209 fn calculate_memory_usage(&self) -> u64 {
211 let mut total = 0;
212 for entry in self.cache.iter() {
213 total += entry.key().len() + entry.value().len();
214 }
215 total as u64
216 }
217
218 fn calculate_disk_usage(&self) -> u64 {
220 if self.aof_path.exists() {
221 std::fs::metadata(&self.aof_path)
222 .map(|m| m.len())
223 .unwrap_or(0)
224 } else {
225 0
226 }
227 }
228}
229
230#[async_trait]
231impl Storage for HybridStorage {
232 async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
233 let start = Instant::now();
234 let result = self.cache.get(key).map(|entry| entry.value().clone());
235 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
236
237 self.update_stats(|stats| {
238 stats.operations_count += 1;
239 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
240 }).await;
241
242 Ok(result)
243 }
244
245 async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
246 let start = Instant::now();
247
248 self.cache.insert(key.clone(), value.clone());
250
251 let entry = AOFEntry::Set { key, value };
253 self.add_to_buffer(entry).await?;
254
255 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
256
257 self.update_stats(|stats| {
258 stats.operations_count += 1;
259 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
260 stats.total_keys = self.cache.len() as u64;
261 stats.memory_usage = self.calculate_memory_usage();
262 stats.disk_usage = self.calculate_disk_usage();
263 }).await;
264
265 Ok(())
266 }
267
268 async fn delete(&self, key: &str) -> MapletResult<bool> {
269 let start = Instant::now();
270 let existed = self.cache.remove(key).is_some();
271
272 if existed {
273 let entry = AOFEntry::Delete { key: key.to_string() };
275 self.add_to_buffer(entry).await?;
276 }
277
278 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
279
280 self.update_stats(|stats| {
281 stats.operations_count += 1;
282 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
283 stats.total_keys = self.cache.len() as u64;
284 stats.memory_usage = self.calculate_memory_usage();
285 stats.disk_usage = self.calculate_disk_usage();
286 }).await;
287
288 Ok(existed)
289 }
290
291 async fn exists(&self, key: &str) -> MapletResult<bool> {
292 let start = Instant::now();
293 let exists = self.cache.contains_key(key);
294 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
295
296 self.update_stats(|stats| {
297 stats.operations_count += 1;
298 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
299 }).await;
300
301 Ok(exists)
302 }
303
304 async fn keys(&self) -> MapletResult<Vec<String>> {
305 let start = Instant::now();
306 let keys: Vec<String> = self.cache.iter().map(|entry| entry.key().clone()).collect();
307 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
308
309 self.update_stats(|stats| {
310 stats.operations_count += 1;
311 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
312 }).await;
313
314 Ok(keys)
315 }
316
317 async fn clear_database(&self) -> MapletResult<()> {
318 let start = Instant::now();
319 self.cache.clear();
320
321 {
323 let mut buffer = self.write_buffer.write().await;
324 buffer.clear();
325 }
326
327 std::fs::write(&self.aof_path, "")
329 .map_err(|e| MapletError::Internal(format!("Failed to clear AOF file: {e}")))?;
330
331 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
332
333 self.update_stats(|stats| {
334 stats.operations_count += 1;
335 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
336 stats.total_keys = 0;
337 stats.memory_usage = 0;
338 stats.disk_usage = 0;
339 }).await;
340
341 Ok(())
342 }
343
344 async fn flush(&self) -> MapletResult<()> {
345 let start = Instant::now();
346
347 {
349 let mut buffer = self.write_buffer.write().await;
350 self.flush_buffer_internal(&mut buffer)?;
351 }
352
353 std::fs::File::open(&self.aof_path)
355 .and_then(|f| f.sync_all())
356 .map_err(|e| MapletError::Internal(format!("Failed to sync AOF file: {e}")))?;
357
358 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
359
360 self.update_stats(|stats| {
361 stats.operations_count += 1;
362 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
363 }).await;
364
365 Ok(())
366 }
367
368 async fn close(&self) -> MapletResult<()> {
369 if let Some(handle) = &self.sync_handle {
371 handle.abort();
372 }
373
374 self.flush().await?;
376
377 Ok(())
378 }
379
380 async fn stats(&self) -> MapletResult<StorageStats> {
381 let mut stats = self.stats.read().await.clone();
382 stats.total_keys = self.cache.len() as u64;
383 stats.memory_usage = self.calculate_memory_usage();
384 stats.disk_usage = self.calculate_disk_usage();
385 Ok(stats)
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392 use tempfile::TempDir;
393
394 #[tokio::test]
395 async fn test_hybrid_storage_basic_operations() {
396 let temp_dir = TempDir::new().unwrap();
397 let config = StorageConfig {
398 data_dir: temp_dir.path().to_string_lossy().to_string(),
399 sync_interval: 1,
400 write_buffer_size: 10,
401 ..Default::default()
402 };
403 let storage = HybridStorage::new(config).unwrap();
404
405 storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
407 let value = storage.get("key1").await.unwrap();
408 assert_eq!(value, Some(b"value1".to_vec()));
409
410 assert!(storage.exists("key1").await.unwrap());
412 assert!(!storage.exists("key2").await.unwrap());
413
414 let deleted = storage.delete("key1").await.unwrap();
416 assert!(deleted);
417 assert!(!storage.exists("key1").await.unwrap());
418 }
419
420 #[tokio::test]
421 async fn test_hybrid_storage_persistence() {
422 let temp_dir = TempDir::new().unwrap();
423 let config = StorageConfig {
424 data_dir: temp_dir.path().to_string_lossy().to_string(),
425 sync_interval: 1,
426 write_buffer_size: 10,
427 ..Default::default()
428 };
429
430 {
432 let storage = HybridStorage::new(config.clone()).unwrap();
433 storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
434 storage.flush().await.unwrap();
435 }
436
437 {
439 let storage = HybridStorage::new(config).unwrap();
440 let value = storage.get("key1").await.unwrap();
441 assert_eq!(value, Some(b"value1".to_vec()));
442 }
443 }
444
445 #[tokio::test]
446 async fn test_hybrid_storage_stats() {
447 let temp_dir = TempDir::new().unwrap();
448 let config = StorageConfig {
449 data_dir: temp_dir.path().to_string_lossy().to_string(),
450 sync_interval: 1,
451 write_buffer_size: 10,
452 ..Default::default()
453 };
454 let storage = HybridStorage::new(config).unwrap();
455
456 storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
457
458 storage.flush().await.unwrap();
460
461 let stats = storage.stats().await.unwrap();
462 assert_eq!(stats.total_keys, 1);
463 assert!(stats.memory_usage > 0);
464 assert!(stats.disk_usage > 0);
465 assert!(stats.operations_count > 0);
466 }
467}