mappy_core/storage/
hybrid.rs1use super::{Storage, StorageConfig, StorageStats};
6use crate::{MapletError, MapletResult};
7use async_trait::async_trait;
8use dashmap::DashMap;
9use serde_json;
10use std::fs::OpenOptions;
11use std::io::{BufRead, BufReader, Write};
12use std::path::Path;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::RwLock;
16use tokio::time::{Duration, interval};
17
18pub struct HybridStorage {
20 cache: Arc<DashMap<String, Vec<u8>>>,
22 aof_path: std::path::PathBuf,
24 config: StorageConfig,
26 stats: Arc<RwLock<StorageStats>>,
28 #[allow(dead_code)]
30 start_time: Instant,
31 sync_handle: Option<tokio::task::JoinHandle<()>>,
33 write_buffer: Arc<RwLock<Vec<AOFEntry>>>,
35}
36
37#[derive(serde::Serialize, serde::Deserialize, Clone)]
38enum AOFEntry {
39 Set { key: String, value: Vec<u8> },
40 Delete { key: String },
41}
42
43impl HybridStorage {
44 pub fn new(config: StorageConfig) -> MapletResult<Self> {
46 std::fs::create_dir_all(&config.data_dir)
48 .map_err(|e| MapletError::Internal(format!("Failed to create data directory: {e}")))?;
49
50 let aof_path = Path::new(&config.data_dir).join("mappy.aof");
51
52 let mut storage = Self {
53 cache: Arc::new(DashMap::new()),
54 aof_path,
55 config,
56 stats: Arc::new(RwLock::new(StorageStats::default())),
57 start_time: Instant::now(),
58 sync_handle: None,
59 write_buffer: Arc::new(RwLock::new(Vec::new())),
60 };
61
62 storage.load_from_aof()?;
64
65 storage.start_sync_task();
67
68 Ok(storage)
69 }
70
71 fn load_from_aof(&self) -> MapletResult<()> {
73 if !self.aof_path.exists() {
74 return Ok(());
75 }
76
77 let file = std::fs::File::open(&self.aof_path)
78 .map_err(|e| MapletError::Internal(format!("Failed to open AOF file: {e}")))?;
79
80 let reader = BufReader::new(file);
81
82 for line in reader.lines() {
83 let line =
84 line.map_err(|e| MapletError::Internal(format!("Failed to read AOF line: {e}")))?;
85 if line.trim().is_empty() {
86 continue;
87 }
88
89 let entry: AOFEntry = serde_json::from_str(&line)
90 .map_err(|e| MapletError::Internal(format!("Failed to parse AOF entry: {e}")))?;
91
92 match entry {
93 AOFEntry::Set { key, value } => {
94 self.cache.insert(key, value);
95 }
96 AOFEntry::Delete { key } => {
97 self.cache.remove(&key);
98 }
99 }
100 }
101
102 Ok(())
103 }
104
105 async fn add_to_buffer(&self, entry: AOFEntry) -> MapletResult<()> {
107 {
108 let mut buffer = self.write_buffer.write().await;
109 buffer.push(entry);
110
111 if buffer.len() >= self.config.write_buffer_size {
113 self.flush_buffer_internal(&mut buffer)?;
114 }
115 }
116
117 Ok(())
118 }
119
120 fn flush_buffer_internal(&self, buffer: &mut Vec<AOFEntry>) -> MapletResult<()> {
122 if buffer.is_empty() {
123 return Ok(());
124 }
125
126 let mut file = OpenOptions::new()
127 .create(true)
128 .append(true)
129 .open(&self.aof_path)
130 .map_err(|e| {
131 MapletError::Internal(format!("Failed to open AOF file for append: {e}"))
132 })?;
133
134 for entry in buffer.iter() {
135 let line = serde_json::to_string(entry).map_err(|e| {
136 MapletError::Internal(format!("Failed to serialize AOF entry: {e}"))
137 })?;
138 writeln!(file, "{line}")
139 .map_err(|e| MapletError::Internal(format!("Failed to write to AOF file: {e}")))?;
140 }
141
142 buffer.clear();
143 Ok(())
144 }
145
146 fn start_sync_task(&mut self) {
148 let write_buffer = Arc::clone(&self.write_buffer);
149 let aof_path = self.aof_path.clone();
150 let sync_interval = Duration::from_secs(self.config.sync_interval);
151
152 let handle = tokio::spawn(async move {
153 let mut interval = interval(sync_interval);
154
155 loop {
156 interval.tick().await;
157
158 {
160 let mut buffer = write_buffer.write().await;
161 if !buffer.is_empty()
162 && let Err(e) = Self::flush_buffer_internal_static(&mut buffer, &aof_path)
163 {
164 eprintln!("Failed to flush AOF buffer: {e}");
165 }
166 }
167
168 if let Err(e) = std::fs::File::open(&aof_path).and_then(|f| f.sync_all()) {
170 eprintln!("Failed to sync AOF file: {e}");
171 }
172 }
173 });
174
175 self.sync_handle = Some(handle);
176 }
177
178 fn flush_buffer_internal_static(
180 buffer: &mut Vec<AOFEntry>,
181 aof_path: &std::path::Path,
182 ) -> MapletResult<()> {
183 if buffer.is_empty() {
184 return Ok(());
185 }
186
187 let mut file = OpenOptions::new()
188 .create(true)
189 .append(true)
190 .open(aof_path)
191 .map_err(|e| {
192 MapletError::Internal(format!("Failed to open AOF file for append: {e}"))
193 })?;
194
195 for entry in buffer.iter() {
196 let line = serde_json::to_string(entry).map_err(|e| {
197 MapletError::Internal(format!("Failed to serialize AOF entry: {e}"))
198 })?;
199 writeln!(file, "{line}")
200 .map_err(|e| MapletError::Internal(format!("Failed to write to AOF file: {e}")))?;
201 }
202
203 buffer.clear();
204 Ok(())
205 }
206
207 async fn update_stats<F>(&self, f: F)
209 where
210 F: FnOnce(&mut StorageStats),
211 {
212 let mut stats = self.stats.write().await;
213 f(&mut stats);
214 }
215
216 fn calculate_memory_usage(&self) -> u64 {
218 let mut total = 0;
219 for entry in self.cache.iter() {
220 total += entry.key().len() + entry.value().len();
221 }
222 total as u64
223 }
224
225 fn calculate_disk_usage(&self) -> u64 {
227 if self.aof_path.exists() {
228 std::fs::metadata(&self.aof_path)
229 .map(|m| m.len())
230 .unwrap_or(0)
231 } else {
232 0
233 }
234 }
235}
236
237#[async_trait]
238impl Storage for HybridStorage {
239 async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
240 let start = Instant::now();
241 let result = self.cache.get(key).map(|entry| entry.value().clone());
242 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
243
244 self.update_stats(|stats| {
245 stats.operations_count += 1;
246 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
247 })
248 .await;
249
250 Ok(result)
251 }
252
253 async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
254 let start = Instant::now();
255
256 self.cache.insert(key.clone(), value.clone());
258
259 let entry = AOFEntry::Set { key, value };
261 self.add_to_buffer(entry).await?;
262
263 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
264
265 self.update_stats(|stats| {
266 stats.operations_count += 1;
267 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
268 stats.total_keys = self.cache.len() as u64;
269 stats.memory_usage = self.calculate_memory_usage();
270 stats.disk_usage = self.calculate_disk_usage();
271 })
272 .await;
273
274 Ok(())
275 }
276
277 async fn delete(&self, key: &str) -> MapletResult<bool> {
278 let start = Instant::now();
279 let existed = self.cache.remove(key).is_some();
280
281 if existed {
282 let entry = AOFEntry::Delete {
284 key: key.to_string(),
285 };
286 self.add_to_buffer(entry).await?;
287 }
288
289 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
290
291 self.update_stats(|stats| {
292 stats.operations_count += 1;
293 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
294 stats.total_keys = self.cache.len() as u64;
295 stats.memory_usage = self.calculate_memory_usage();
296 stats.disk_usage = self.calculate_disk_usage();
297 })
298 .await;
299
300 Ok(existed)
301 }
302
303 async fn exists(&self, key: &str) -> MapletResult<bool> {
304 let start = Instant::now();
305 let exists = self.cache.contains_key(key);
306 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
307
308 self.update_stats(|stats| {
309 stats.operations_count += 1;
310 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
311 })
312 .await;
313
314 Ok(exists)
315 }
316
317 async fn keys(&self) -> MapletResult<Vec<String>> {
318 let start = Instant::now();
319 let keys: Vec<String> = self.cache.iter().map(|entry| entry.key().clone()).collect();
320 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
321
322 self.update_stats(|stats| {
323 stats.operations_count += 1;
324 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
325 })
326 .await;
327
328 Ok(keys)
329 }
330
331 async fn clear_database(&self) -> MapletResult<()> {
332 let start = Instant::now();
333 self.cache.clear();
334
335 {
337 let mut buffer = self.write_buffer.write().await;
338 buffer.clear();
339 }
340
341 std::fs::write(&self.aof_path, "")
343 .map_err(|e| MapletError::Internal(format!("Failed to clear AOF file: {e}")))?;
344
345 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
346
347 self.update_stats(|stats| {
348 stats.operations_count += 1;
349 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
350 stats.total_keys = 0;
351 stats.memory_usage = 0;
352 stats.disk_usage = 0;
353 })
354 .await;
355
356 Ok(())
357 }
358
359 async fn flush(&self) -> MapletResult<()> {
360 let start = Instant::now();
361
362 {
364 let mut buffer = self.write_buffer.write().await;
365 self.flush_buffer_internal(&mut buffer)?;
366 }
367
368 std::fs::File::open(&self.aof_path)
370 .and_then(|f| f.sync_all())
371 .map_err(|e| MapletError::Internal(format!("Failed to sync AOF file: {e}")))?;
372
373 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
374
375 self.update_stats(|stats| {
376 stats.operations_count += 1;
377 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
378 })
379 .await;
380
381 Ok(())
382 }
383
384 async fn close(&self) -> MapletResult<()> {
385 if let Some(handle) = &self.sync_handle {
387 handle.abort();
388 }
389
390 self.flush().await?;
392
393 Ok(())
394 }
395
396 async fn stats(&self) -> MapletResult<StorageStats> {
397 let mut stats = self.stats.read().await.clone();
398 stats.total_keys = self.cache.len() as u64;
399 stats.memory_usage = self.calculate_memory_usage();
400 stats.disk_usage = self.calculate_disk_usage();
401 Ok(stats)
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use tempfile::TempDir;
409
410 #[tokio::test]
411 async fn test_hybrid_storage_basic_operations() {
412 let temp_dir = TempDir::new().unwrap();
413 let config = StorageConfig {
414 data_dir: temp_dir.path().to_string_lossy().to_string(),
415 sync_interval: 1,
416 write_buffer_size: 10,
417 ..Default::default()
418 };
419 let storage = HybridStorage::new(config).unwrap();
420
421 storage
423 .set("key1".to_string(), b"value1".to_vec())
424 .await
425 .unwrap();
426 let value = storage.get("key1").await.unwrap();
427 assert_eq!(value, Some(b"value1".to_vec()));
428
429 assert!(storage.exists("key1").await.unwrap());
431 assert!(!storage.exists("key2").await.unwrap());
432
433 let deleted = storage.delete("key1").await.unwrap();
435 assert!(deleted);
436 assert!(!storage.exists("key1").await.unwrap());
437 }
438
439 #[tokio::test]
440 async fn test_hybrid_storage_persistence() {
441 let temp_dir = TempDir::new().unwrap();
442 let config = StorageConfig {
443 data_dir: temp_dir.path().to_string_lossy().to_string(),
444 sync_interval: 1,
445 write_buffer_size: 10,
446 ..Default::default()
447 };
448
449 {
451 let storage = HybridStorage::new(config.clone()).unwrap();
452 storage
453 .set("key1".to_string(), b"value1".to_vec())
454 .await
455 .unwrap();
456 storage.flush().await.unwrap();
457 }
458
459 {
461 let storage = HybridStorage::new(config).unwrap();
462 let value = storage.get("key1").await.unwrap();
463 assert_eq!(value, Some(b"value1".to_vec()));
464 }
465 }
466
467 #[tokio::test]
468 async fn test_hybrid_storage_stats() {
469 let temp_dir = TempDir::new().unwrap();
470 let config = StorageConfig {
471 data_dir: temp_dir.path().to_string_lossy().to_string(),
472 sync_interval: 1,
473 write_buffer_size: 10,
474 ..Default::default()
475 };
476 let storage = HybridStorage::new(config).unwrap();
477
478 storage
479 .set("key1".to_string(), b"value1".to_vec())
480 .await
481 .unwrap();
482
483 storage.flush().await.unwrap();
485
486 let stats = storage.stats().await.unwrap();
487 assert_eq!(stats.total_keys, 1);
488 assert!(stats.memory_usage > 0);
489 assert!(stats.disk_usage > 0);
490 assert!(stats.operations_count > 0);
491 }
492}