mappy_core/
engine.rs

1//! Engine layer for mappy
2//! 
3//! Integrates storage backends with maplet functionality to provide a complete key-value store.
4
5use crate::{Maplet, MapletResult, MergeOperator};
6use crate::types::MapletConfig;
7use crate::storage::{Storage, StorageStats, StorageConfig, PersistenceMode};
8use crate::storage::memory::MemoryStorage;
9use crate::storage::disk::DiskStorage;
10use crate::storage::aof::AOFStorage;
11use crate::storage::hybrid::HybridStorage;
12use crate::ttl::{TTLManager, TTLConfig, TTLStats};
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use serde::{Serialize, Deserialize};
16use std::time::SystemTime;
17
18/// Simple merge operator for Vec<u8> that replaces values
19#[derive(Debug, Clone, Default)]
20pub struct ReplaceOperator;
21
22impl MergeOperator<Vec<u8>> for ReplaceOperator {
23    fn merge(&self, _existing: Vec<u8>, new: Vec<u8>) -> MapletResult<Vec<u8>> {
24        Ok(new)
25    }
26
27    fn identity(&self) -> Vec<u8> {
28        Vec::new()
29    }
30}
31
32/// Engine configuration
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct EngineConfig {
35    /// Maplet configuration
36    pub maplet: MapletConfig,
37    /// Storage configuration
38    pub storage: StorageConfig,
39    /// TTL configuration
40    pub ttl: TTLConfig,
41    /// Persistence mode
42    pub persistence_mode: PersistenceMode,
43    /// Data directory for persistent storage
44    pub data_dir: Option<String>,
45}
46
47impl Default for EngineConfig {
48    fn default() -> Self {
49        Self {
50            maplet: MapletConfig::default(),
51            storage: StorageConfig::default(),
52            ttl: TTLConfig::default(),
53            persistence_mode: PersistenceMode::Memory,
54            data_dir: None,
55        }
56    }
57}
58
59/// Engine statistics
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct EngineStats {
62    /// Maplet statistics
63    pub maplet_stats: crate::MapletStats,
64    /// Storage statistics
65    pub storage_stats: StorageStats,
66    /// TTL statistics
67    pub ttl_stats: TTLStats,
68    /// Engine uptime in seconds
69    pub uptime_seconds: u64,
70    /// Total operations performed
71    pub total_operations: u64,
72}
73
74/// Main engine that combines maplet with storage
75pub struct Engine {
76    /// The maplet for approximate key-value operations
77    maplet: Arc<RwLock<Maplet<String, Vec<u8>, ReplaceOperator>>>,
78    /// The storage backend
79    storage: Arc<dyn Storage>,
80    /// TTL manager
81    ttl_manager: Arc<TTLManager>,
82    /// Engine configuration
83    config: EngineConfig,
84    /// Start time for uptime calculation
85    start_time: SystemTime,
86    /// Operation counter
87    operation_count: Arc<RwLock<u64>>,
88}
89
90impl Engine {
91    /// Create a new engine with the given configuration
92    pub async fn new(config: EngineConfig) -> MapletResult<Self> {
93        let maplet = Arc::new(RwLock::new(
94            Maplet::<String, Vec<u8>, ReplaceOperator>::with_config(config.maplet.clone())?
95        ));
96
97        let storage: Arc<dyn Storage> = match config.persistence_mode {
98            PersistenceMode::Memory => {
99                Arc::new(MemoryStorage::new(config.storage.clone()).await?)
100            },
101            PersistenceMode::Disk => {
102                Arc::new(DiskStorage::new(config.storage.clone()).await?)
103            },
104            PersistenceMode::AOF => {
105                Arc::new(AOFStorage::new(config.storage.clone()).await?)
106            },
107            PersistenceMode::Hybrid => {
108                Arc::new(HybridStorage::new(config.storage.clone()).await?)
109            },
110        };
111
112        // Create TTL manager
113        let ttl_manager = Arc::new(TTLManager::new(config.ttl.clone()));
114
115        // Start TTL cleanup task
116        let storage_clone = Arc::clone(&storage);
117        ttl_manager.start_cleanup(move |expired_entries| {
118            let storage = Arc::clone(&storage_clone);
119            
120            // Spawn async task for cleanup
121            tokio::spawn(async move {
122                for entry in expired_entries {
123                    // Remove expired key from storage
124                    let _ = storage.delete(&entry.key).await;
125                }
126            });
127            
128            Ok(())
129        }).await?;
130
131        Ok(Self {
132            maplet,
133            storage,
134            ttl_manager,
135            config,
136            start_time: SystemTime::now(),
137            operation_count: Arc::new(RwLock::new(0)),
138        })
139    }
140
141    /// Get a value by key
142    pub async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
143        // Check if key has expired
144        if self.ttl_manager.is_expired(key).await? {
145            // Remove expired key
146            self.ttl_manager.remove_ttl(key).await?;
147            let _ = self.storage.delete(key).await;
148            return Ok(None);
149        }
150
151        // First check the maplet for approximate membership
152        let maplet_guard = self.maplet.read().await;
153        if !maplet_guard.contains(&key.to_string()).await {
154            drop(maplet_guard);
155            return Ok(None);
156        }
157        drop(maplet_guard);
158
159        // If the key exists in the maplet, get the actual value from storage
160        let result = self.storage.get(key).await;
161        
162        // Increment operation counter
163        {
164            let mut count = self.operation_count.write().await;
165            *count += 1;
166        }
167
168        result
169    }
170
171    /// Set a key-value pair
172    pub async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
173        // Store in the maplet for approximate membership
174        {
175            let maplet_guard = self.maplet.write().await;
176            maplet_guard.insert(key.clone(), value.clone()).await?;
177        }
178
179        // Store in the actual storage backend
180        self.storage.set(key, value).await?;
181
182        // Increment operation counter
183        {
184            let mut count = self.operation_count.write().await;
185            *count += 1;
186        }
187
188        Ok(())
189    }
190
191    /// Delete a key
192    pub async fn delete(&self, key: &str) -> MapletResult<bool> {
193        // Remove from storage
194        let result = self.storage.delete(key).await?;
195
196        // If the key was successfully deleted from storage, we could remove it from maplet
197        // but since maplet is just for approximate membership, we'll leave it as is
198        // The false positives will be handled by the storage layer
199
200        // Increment operation counter
201        {
202            let mut count = self.operation_count.write().await;
203            *count += 1;
204        }
205
206        Ok(result)
207    }
208
209    /// Check if a key exists
210    pub async fn exists(&self, key: &str) -> MapletResult<bool> {
211        // Check maplet first for fast approximate membership
212        let maplet_guard = self.maplet.read().await;
213        if !maplet_guard.contains(&key.to_string()).await {
214            drop(maplet_guard);
215            return Ok(false);
216        }
217        drop(maplet_guard);
218
219        // If it exists in maplet, check storage for definitive answer
220        let result = self.storage.exists(key).await?;
221
222        // Increment operation counter
223        {
224            let mut count = self.operation_count.write().await;
225            *count += 1;
226        }
227
228        Ok(result)
229    }
230
231    /// Get all keys
232    pub async fn keys(&self) -> MapletResult<Vec<String>> {
233        let result = self.storage.keys().await?;
234
235        // Increment operation counter
236        {
237            let mut count = self.operation_count.write().await;
238            *count += 1;
239        }
240
241        Ok(result)
242    }
243
244    /// Clear all data
245    pub async fn clear(&self) -> MapletResult<()> {
246        // Clear maplet
247        {
248            let mut maplet_guard = self.maplet.write().await;
249            // Note: Maplet doesn't have a clear method, so we create a new one
250            *maplet_guard = Maplet::<String, Vec<u8>, ReplaceOperator>::with_config(self.config.maplet.clone())?;
251        }
252
253        // Clear storage
254        self.storage.clear_database().await?;
255
256        // Reset operation counter
257        {
258            let mut count = self.operation_count.write().await;
259            *count = 0;
260        }
261
262        Ok(())
263    }
264
265    /// Flush data to persistent storage
266    pub async fn flush(&self) -> MapletResult<()> {
267        self.storage.flush().await?;
268        Ok(())
269    }
270
271    /// Close the engine and cleanup resources
272    pub async fn close(&self) -> MapletResult<()> {
273        // Stop TTL cleanup task
274        self.ttl_manager.stop_cleanup().await?;
275        
276        // Close storage
277        self.storage.close().await?;
278        Ok(())
279    }
280
281    /// Get engine statistics
282    pub async fn stats(&self) -> MapletResult<EngineStats> {
283        let maplet_guard = self.maplet.read().await;
284        let maplet_stats = maplet_guard.stats().await;
285        drop(maplet_guard);
286
287        let storage_stats = self.storage.stats().await?;
288        let ttl_stats = self.ttl_manager.get_stats().await?;
289        let operation_count = *self.operation_count.read().await;
290        
291        let uptime = self.start_time.elapsed()
292            .unwrap_or_default()
293            .as_secs();
294
295        Ok(EngineStats {
296            maplet_stats,
297            storage_stats,
298            ttl_stats,
299            uptime_seconds: uptime,
300            total_operations: operation_count,
301        })
302    }
303
304    /// Get memory usage in bytes
305    pub async fn memory_usage(&self) -> MapletResult<u64> {
306        let storage_stats = self.storage.stats().await?;
307        Ok(storage_stats.memory_usage)
308    }
309
310    /// Get the persistence mode
311    pub fn persistence_mode(&self) -> PersistenceMode {
312        self.config.persistence_mode.clone()
313    }
314
315    /// Get the engine configuration
316    pub fn config(&self) -> &EngineConfig {
317        &self.config
318    }
319
320    /// Set TTL for a key
321    pub async fn expire(&self, key: &str, ttl_seconds: u64) -> MapletResult<bool> {
322        // Check if key exists
323        if !self.exists(key).await? {
324            return Ok(false);
325        }
326
327        // Set TTL
328        self.ttl_manager.set_ttl(key.to_string(), 0, ttl_seconds).await?;
329        
330        // Increment operation counter
331        {
332            let mut count = self.operation_count.write().await;
333            *count += 1;
334        }
335
336        Ok(true)
337    }
338
339    /// Get TTL for a key in seconds
340    pub async fn ttl(&self, key: &str) -> MapletResult<Option<i64>> {
341        let result = self.ttl_manager.get_ttl(key).await?;
342        
343        // Increment operation counter
344        {
345            let mut count = self.operation_count.write().await;
346            *count += 1;
347        }
348
349        Ok(result)
350    }
351
352    /// Remove TTL for a key
353    pub async fn persist(&self, key: &str) -> MapletResult<bool> {
354        let had_ttl = self.ttl_manager.get_ttl(key).await?.is_some();
355        self.ttl_manager.remove_ttl(key).await?;
356        
357        // Increment operation counter
358        {
359            let mut count = self.operation_count.write().await;
360            *count += 1;
361        }
362
363        Ok(had_ttl)
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use tempfile::TempDir;
371
372    #[tokio::test]
373    async fn test_engine_creation() {
374        let config = EngineConfig::default();
375        let engine = Engine::new(config).await.unwrap();
376        assert_eq!(engine.persistence_mode(), PersistenceMode::Memory);
377    }
378
379    #[tokio::test]
380    async fn test_engine_basic_operations() {
381        let config = EngineConfig::default();
382        let engine = Engine::new(config).await.unwrap();
383
384        // Test set and get
385        engine.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
386        let value = engine.get("key1").await.unwrap();
387        assert_eq!(value, Some(b"value1".to_vec()));
388
389        // Test exists
390        assert!(engine.exists("key1").await.unwrap());
391        assert!(!engine.exists("nonexistent").await.unwrap());
392
393        // Test delete
394        let deleted = engine.delete("key1").await.unwrap();
395        assert!(deleted);
396        assert!(!engine.exists("key1").await.unwrap());
397    }
398
399    #[tokio::test]
400    async fn test_engine_with_disk_storage() {
401        let temp_dir = TempDir::new().unwrap();
402        let config = EngineConfig {
403            persistence_mode: PersistenceMode::Disk,
404            data_dir: Some(temp_dir.path().to_string_lossy().to_string()),
405            ..Default::default()
406        };
407        
408        let engine = Engine::new(config).await.unwrap();
409        
410        // Test operations
411        engine.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
412        let value = engine.get("key1").await.unwrap();
413        assert_eq!(value, Some(b"value1".to_vec()));
414    }
415
416    #[tokio::test]
417    async fn test_engine_stats() {
418        let config = EngineConfig::default();
419        let engine = Engine::new(config).await.unwrap();
420
421        // Perform some operations
422        engine.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
423        engine.get("key1").await.unwrap();
424
425        let stats = engine.stats().await.unwrap();
426        assert!(stats.total_operations > 0);
427        assert!(stats.uptime_seconds >= 0);
428    }
429
430    #[tokio::test]
431    async fn test_engine_ttl_operations() {
432        let config = EngineConfig::default();
433        let engine = Engine::new(config).await.unwrap();
434
435        // Set a key
436        engine.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
437        
438        // Set TTL
439        let result = engine.expire("key1", 60).await.unwrap();
440        assert!(result);
441
442        // Check TTL
443        let ttl = engine.ttl("key1").await.unwrap();
444        assert!(ttl.is_some());
445        assert!(ttl.unwrap() <= 60);
446
447        // Remove TTL
448        let had_ttl = engine.persist("key1").await.unwrap();
449        assert!(had_ttl);
450
451        // Check TTL is gone
452        let ttl = engine.ttl("key1").await.unwrap();
453        assert!(ttl.is_none());
454    }
455
456    #[tokio::test]
457    async fn test_engine_ttl_expiration() {
458        let config = EngineConfig::default();
459        let engine = Engine::new(config).await.unwrap();
460
461        // Set a key with very short TTL
462        engine.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
463        engine.expire("key1", 1).await.unwrap();
464        
465        // Wait for expiration
466        tokio::time::sleep(tokio::time::Duration::from_millis(1100)).await;
467        
468        // Key should be expired
469        let value = engine.get("key1").await.unwrap();
470        assert!(value.is_none());
471    }
472}