kv_core/
engine.rs

1//! Main KV engine implementation
2//! 
3//! Orchestrates storage, encryption, TTL management, and data structures.
4
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tracing::{error, info};
9
10use crate::{
11    KVError, KVResult, Key, Value, Entry, DatabaseId, 
12    KVConfig, KVStats, TTL, StorageFactory,
13    encryption::KeyManager,
14    ttl::{TTLManager, TTLSupport},
15    storage::Storage as StorageTrait,
16    pubsub::{PubSubManager, ChannelPattern},
17};
18
19/// Main KV engine
20pub struct KVEngine {
21    /// Configuration
22    config: KVConfig,
23    /// Storage backends per database
24    storages: Arc<RwLock<HashMap<DatabaseId, Box<dyn StorageTrait>>>>,
25    /// Encryption key manager
26    _key_manager: Arc<RwLock<KeyManager>>,
27    /// TTL manager
28    ttl_manager: Arc<RwLock<TTLManager>>,
29    /// Pub/Sub manager
30    pubsub_manager: Arc<RwLock<PubSubManager>>,
31    /// Statistics
32    stats: Arc<RwLock<KVStats>>,
33    /// Start time for uptime calculation
34    start_time: std::time::Instant,
35}
36
37impl KVEngine {
38    /// Create a new KV engine
39    /// 
40    /// # Errors
41    /// Returns error if storage initialization fails
42    pub async fn new(config: KVConfig) -> KVResult<Self> {
43        info!("Initializing KV engine with config: {:?}", config);
44        
45        // Initialize key manager
46        let key_manager = KeyManager::new(&config.master_key)?;
47        
48        // Initialize TTL manager
49        let ttl_manager = TTLManager::new(
50            std::time::Duration::from_secs(config.expiration_check_interval)
51        );
52        
53        // Initialize Pub/Sub manager
54        let mut pubsub_manager = PubSubManager::default();
55        pubsub_manager.start_cleanup();
56        
57        let engine = Self {
58            config: config.clone(),
59            storages: Arc::new(RwLock::new(HashMap::new())),
60            _key_manager: Arc::new(RwLock::new(key_manager)),
61            ttl_manager: Arc::new(RwLock::new(ttl_manager)),
62            pubsub_manager: Arc::new(RwLock::new(pubsub_manager)),
63            stats: Arc::new(RwLock::new(KVStats {
64                total_keys: 0,
65                expired_keys: 0,
66                memory_usage: 0,
67                disk_usage: 0,
68                total_operations: 0,
69                ops_per_second: 0.0,
70                uptime: 0,
71                active_connections: 0,
72            })),
73            start_time: std::time::Instant::now(),
74        };
75        
76        // Start TTL cleanup
77        engine.start_ttl_cleanup().await;
78        
79        // Initialize storage for default database
80        engine.ensure_storage(0).await?;
81        
82        info!("KV engine initialized successfully");
83        Ok(engine)
84    }
85
86    /// Ensure storage exists for a database
87    #[allow(clippy::significant_drop_tightening)]
88    async fn ensure_storage(&self, database_id: DatabaseId) -> KVResult<()> {
89        let mut storages = self.storages.write().await;
90        if let std::collections::hash_map::Entry::Vacant(e) = storages.entry(database_id) {
91            let storage = StorageFactory::create(
92                self.config.persistence_mode,
93                &self.config.data_dir,
94                database_id,
95            ).await?;
96            e.insert(storage);
97        }
98        Ok(())
99    }
100
101    /// Get storage for a database
102    #[allow(clippy::option_if_let_else)]
103    async fn _get_storage(&self, database_id: DatabaseId) -> KVResult<Arc<Box<dyn StorageTrait>>> {
104        self.ensure_storage(database_id).await?;
105        let storages = self.storages.read().await;
106        if let Some(_storage) = storages.get(&database_id) {
107            // We need to return a reference, but we can't clone the trait object
108            // This is a limitation of the current design - in a real implementation
109            // we'd use Arc<dyn StorageTrait> instead of Box<dyn StorageTrait>
110            Err(KVError::Internal("Storage access not implemented".to_string()))
111        } else {
112            Err(KVError::Internal("Storage not found".to_string()))
113        }
114    }
115
116    /// Start TTL cleanup background task
117    async fn start_ttl_cleanup(&self) {
118        let ttl_manager = Arc::clone(&self.ttl_manager);
119        let storages = Arc::clone(&self.storages);
120        
121        {
122            let mut ttl_manager_guard = ttl_manager.write().await;
123            ttl_manager_guard.start_cleanup(move |expired_keys| {
124                let storages = Arc::clone(&storages);
125                
126                tokio::spawn(async move {
127                    for key in expired_keys {
128                        // Remove from all databases
129                        let storages_guard = storages.read().await;
130                        for (database_id, storage) in storages_guard.iter() {
131                            if let Err(e) = storage.delete(*database_id, &key).await {
132                                error!("Failed to delete expired key {} from database {}: {}", key, database_id, e);
133                            }
134                        }
135                    }
136                });
137            });
138        }
139    }
140
141    /// Update statistics
142    async fn update_stats(&self, operation_count: u64) {
143        let mut stats = self.stats.write().await;
144        stats.total_operations += operation_count;
145        stats.uptime = self.start_time.elapsed().as_secs();
146        
147        if stats.uptime > 0 {
148            #[allow(clippy::cast_precision_loss)]
149            let ops_per_second = stats.total_operations as f64 / stats.uptime as f64;
150            stats.ops_per_second = ops_per_second;
151        }
152    }
153
154    // Basic KV operations
155
156    /// Get a value by key
157    /// 
158    /// # Errors
159    /// Returns error if storage operation fails
160    #[allow(clippy::significant_drop_tightening)]
161    pub async fn get(&self, database_id: DatabaseId, key: &Key) -> KVResult<Option<Value>> {
162        self.ensure_storage(database_id).await?;
163        
164        let storages = self.storages.read().await;
165        let storage = storages.get(&database_id)
166            .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
167        
168        let entry = storage.get(database_id, key).await?;
169        
170        if let Some(mut entry) = entry {
171            // Check if expired
172            if entry.is_expired() {
173                // Remove expired entry
174                drop(storages); // Release read lock
175                let _ = self.delete(database_id, key).await;
176                return Ok(None);
177            }
178            
179            // Touch entry (update access count and time)
180            entry.touch();
181            
182            // Update in storage
183            drop(storages); // Release read lock
184            let mut storages = self.storages.write().await;
185            let storage = storages.get_mut(&database_id)
186                .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
187            storage.set(database_id, key.clone(), entry.clone()).await?;
188            
189            self.update_stats(1).await;
190            Ok(Some(entry.value))
191        } else {
192            self.update_stats(1).await;
193            Ok(None)
194        }
195    }
196
197    /// Set a value with optional TTL
198    /// 
199    /// # Errors
200    /// Returns error if storage operation fails
201    #[allow(clippy::significant_drop_tightening)]
202    pub async fn set(&self, database_id: DatabaseId, key: Key, value: Value, ttl: Option<TTL>) -> KVResult<()> {
203        self.ensure_storage(database_id).await?;
204        
205        let entry = Entry::new(value, ttl);
206        
207        let mut storages = self.storages.write().await;
208        let storage = storages.get_mut(&database_id)
209            .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
210        
211        storage.set(database_id, key.clone(), entry).await?;
212        
213        // Update TTL manager if TTL is set
214        if let Some(ttl) = ttl {
215            let ttl_manager = self.ttl_manager.read().await;
216            ttl_manager.set_ttl(key.clone(), ttl).await?;
217        }
218        
219        // Publish cache invalidation event
220        if let Err(e) = self.publish_invalidation(&key).await {
221            error!("Failed to publish invalidation for key {}: {}", key, e);
222        }
223        
224        self.update_stats(1).await;
225        Ok(())
226    }
227
228    /// Delete a key
229    /// 
230    /// # Errors
231    /// Returns error if storage operation fails
232    #[allow(clippy::significant_drop_tightening)]
233    pub async fn delete(&self, database_id: DatabaseId, key: &Key) -> KVResult<bool> {
234        self.ensure_storage(database_id).await?;
235        
236        let mut storages = self.storages.write().await;
237        let storage = storages.get_mut(&database_id)
238            .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
239        
240        let deleted = storage.delete(database_id, key).await?;
241        
242        if deleted {
243            // Remove from TTL manager
244            let ttl_manager = self.ttl_manager.read().await;
245            let _ = ttl_manager.remove_ttl(key).await;
246            
247            // Publish cache invalidation event
248            if let Err(e) = self.publish_invalidation(key).await {
249                error!("Failed to publish invalidation for key {}: {}", key, e);
250            }
251        }
252        
253        self.update_stats(1).await;
254        Ok(deleted)
255    }
256
257    /// Check if a key exists
258    /// 
259    /// # Errors
260    /// Returns error if storage operation fails
261    #[allow(clippy::significant_drop_tightening)]
262    pub async fn exists(&self, database_id: DatabaseId, key: &Key) -> KVResult<bool> {
263        self.ensure_storage(database_id).await?;
264        
265        let storages = self.storages.read().await;
266        let storage = storages.get(&database_id)
267            .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
268        
269        let exists = storage.exists(database_id, key).await?;
270        
271        self.update_stats(1).await;
272        Ok(exists)
273    }
274
275    /// Set TTL for an existing key
276    /// 
277    /// # Errors
278    /// Returns error if storage operation fails
279    #[allow(clippy::significant_drop_tightening)]
280    pub async fn expire(&self, database_id: DatabaseId, key: &Key, ttl: TTL) -> KVResult<bool> {
281        self.ensure_storage(database_id).await?;
282        
283        // Check if key exists
284        if !self.exists(database_id, key).await? {
285            return Ok(false);
286        }
287        
288        // Update TTL
289        let ttl_manager = self.ttl_manager.read().await;
290        ttl_manager.set_ttl(key.clone(), ttl).await?;
291        
292        // Update entry in storage
293        let storages = self.storages.read().await;
294        let storage = storages.get(&database_id)
295            .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
296        
297        if let Some(mut entry) = storage.get(database_id, key).await? {
298            entry.set_ttl(ttl);
299            drop(storages); // Release read lock
300            let mut storages = self.storages.write().await;
301            let storage = storages.get_mut(&database_id)
302                .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
303            storage.set(database_id, key.clone(), entry).await?;
304        }
305        
306        self.update_stats(1).await;
307        Ok(true)
308    }
309
310    /// Get remaining TTL for a key
311    /// 
312    /// # Errors
313    /// Returns error if storage operation fails
314    #[allow(clippy::significant_drop_tightening)]
315    pub async fn ttl(&self, database_id: DatabaseId, key: &Key) -> KVResult<Option<TTL>> {
316        self.ensure_storage(database_id).await?;
317        
318        let ttl_manager = self.ttl_manager.read().await;
319        let ttl = ttl_manager.get_ttl(key).await?;
320        
321        self.update_stats(1).await;
322        Ok(ttl)
323    }
324
325    /// Get all keys in a database
326    /// 
327    /// # Errors
328    /// Returns error if storage operation fails
329    #[allow(clippy::significant_drop_tightening)]
330    pub async fn keys(&self, database_id: DatabaseId) -> KVResult<Vec<Key>> {
331        self.ensure_storage(database_id).await?;
332        
333        let storages = self.storages.read().await;
334        let storage = storages.get(&database_id)
335            .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
336        
337        let keys = storage.keys(database_id).await?;
338        
339        self.update_stats(1).await;
340        Ok(keys)
341    }
342
343    /// Get keys matching a pattern
344    /// 
345    /// # Errors
346    /// Returns error if storage operation fails
347    #[allow(clippy::significant_drop_tightening)]
348    pub async fn keys_pattern(&self, database_id: DatabaseId, pattern: &str) -> KVResult<Vec<Key>> {
349        self.ensure_storage(database_id).await?;
350        
351        let storages = self.storages.read().await;
352        let storage = storages.get(&database_id)
353            .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
354        
355        let keys = storage.keys_pattern(database_id, pattern).await?;
356        
357        self.update_stats(1).await;
358        Ok(keys)
359    }
360
361    /// Clear all data in a database
362    /// 
363    /// # Errors
364    /// Returns error if storage operation fails
365    #[allow(clippy::significant_drop_tightening)]
366    pub async fn clear_database(&self, database_id: DatabaseId) -> KVResult<()> {
367        self.ensure_storage(database_id).await?;
368        
369        let mut storages = self.storages.write().await;
370        let storage = storages.get_mut(&database_id)
371            .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
372        
373        storage.clear_database(database_id).await?;
374        
375        // Clear TTL information for this database
376        let ttl_manager = self.ttl_manager.read().await;
377        ttl_manager.clear_all().await;
378        
379        self.update_stats(1).await;
380        Ok(())
381    }
382
383    /// Get engine statistics
384    /// 
385    /// # Errors
386    /// Returns error if statistics calculation fails
387    #[allow(clippy::significant_drop_tightening)]
388    pub async fn get_stats(&self) -> KVResult<KVStats> {
389        let mut stats = self.stats.read().await.clone();
390        
391        // Update memory and disk usage
392        let storages = self.storages.read().await;
393        let mut total_memory = 0u64;
394        let mut total_disk = 0u64;
395        let mut total_keys = 0u64;
396        
397        for (database_id, storage) in storages.iter() {
398            if let Ok(storage_stats) = storage.get_stats(*database_id).await {
399                total_memory += storage_stats.memory_usage;
400                total_disk += storage_stats.disk_usage.unwrap_or(0);
401                total_keys += storage_stats.total_keys;
402            }
403        }
404        
405        stats.memory_usage = total_memory;
406        stats.disk_usage = total_disk;
407        stats.total_keys = total_keys;
408        
409        Ok(stats)
410    }
411
412    /// Flush all pending writes
413    /// 
414    /// # Errors
415    /// Returns error if flush operation fails
416    #[allow(clippy::significant_drop_tightening)]
417    pub async fn flush(&self) -> KVResult<()> {
418        let storages = self.storages.read().await;
419        for (database_id, storage) in storages.iter() {
420            if let Err(e) = storage.flush().await {
421                error!("Failed to flush database {}: {}", database_id, e);
422            }
423        }
424        Ok(())
425    }
426
427    /// Close the engine and cleanup resources
428    /// 
429    /// # Errors
430    /// Returns error if cleanup fails
431    #[allow(clippy::significant_drop_tightening)]
432    pub async fn close(&self) -> KVResult<()> {
433        info!("Closing KV engine");
434        
435        // Stop TTL cleanup
436        let mut ttl_manager = self.ttl_manager.write().await;
437        ttl_manager.stop_cleanup();
438        
439        // Stop Pub/Sub cleanup
440        let mut pubsub_manager = self.pubsub_manager.write().await;
441        pubsub_manager.stop_cleanup();
442        
443        // Close all storages
444        let storages = self.storages.read().await;
445        for (database_id, storage) in storages.iter() {
446            if let Err(e) = storage.close().await {
447                error!("Failed to close storage for database {}: {}", database_id, e);
448            }
449        }
450        
451        info!("KV engine closed");
452        Ok(())
453    }
454
455    // Pub/Sub operations
456
457    /// Publish a message to a channel
458    /// 
459    /// # Errors
460    /// Returns error if publishing fails
461    pub async fn publish(&self, channel: &str, message: Value) -> KVResult<usize> {
462        let pubsub_manager = self.pubsub_manager.read().await;
463        pubsub_manager.publish(channel, message).await
464    }
465
466    /// Subscribe to a channel pattern
467    /// 
468    /// # Errors
469    /// Returns error if subscription fails
470    pub async fn subscribe(&self, pattern: ChannelPattern) -> KVResult<tokio::sync::mpsc::UnboundedReceiver<crate::PubSubMessage>> {
471        let pubsub_manager = self.pubsub_manager.read().await;
472        pubsub_manager.subscribe(pattern).await
473    }
474
475    /// Unsubscribe from a channel pattern
476    /// 
477    /// # Errors
478    /// Returns error if unsubscription fails
479    pub async fn unsubscribe(&self, pattern: &ChannelPattern) -> KVResult<usize> {
480        let pubsub_manager = self.pubsub_manager.read().await;
481        pubsub_manager.unsubscribe(pattern).await
482    }
483
484    /// Subscribe to cache invalidation events
485    /// 
486    /// # Errors
487    /// Returns error if subscription fails
488    pub async fn subscribe_to_invalidations(&self) -> KVResult<tokio::sync::mpsc::UnboundedReceiver<crate::PubSubMessage>> {
489        let pattern = ChannelPattern::wildcard("cache:invalidate:*".to_string());
490        self.subscribe(pattern).await
491    }
492
493    /// Publish cache invalidation event
494    /// 
495    /// # Errors
496    /// Returns error if publishing fails
497    async fn publish_invalidation(&self, key: &Key) -> KVResult<usize> {
498        let channel = format!("cache:invalidate:{}", key);
499        let message = Value::String(format!("invalidate:{}", key));
500        self.publish(&channel, message).await
501    }
502}
503
504#[cfg(test)]
505mod tests {
506    use super::*;
507    use tempfile::TempDir;
508    use crate::PersistenceMode;
509
510    async fn create_test_engine() -> KVEngine {
511        let temp_dir = TempDir::new().unwrap();
512        let config = KVConfig {
513            master_key: String::new(), // Empty string will generate a random key
514            persistence_mode: PersistenceMode::Memory,
515            data_dir: temp_dir.path().to_string_lossy().to_string(),
516            ..Default::default()
517        };
518        
519        KVEngine::new(config).await.unwrap()
520    }
521
522    #[tokio::test]
523    async fn test_basic_operations() {
524        let engine = create_test_engine().await;
525        let database_id = 0;
526        
527        // Test set and get
528        let value = Value::String("test_value".to_string());
529        engine.set(database_id, "test_key".to_string(), value.clone(), None).await.unwrap();
530        
531        let retrieved = engine.get(database_id, &"test_key".to_string()).await.unwrap();
532        assert!(retrieved.is_some());
533        assert_eq!(retrieved.unwrap().as_string().unwrap(), "test_value");
534        
535        // Test exists
536        let exists = engine.exists(database_id, &"test_key".to_string()).await.unwrap();
537        assert!(exists);
538        
539        // Test delete
540        let deleted = engine.delete(database_id, &"test_key".to_string()).await.unwrap();
541        assert!(deleted);
542        
543        let exists_after = engine.exists(database_id, &"test_key".to_string()).await.unwrap();
544        assert!(!exists_after);
545    }
546
547    #[tokio::test]
548    async fn test_ttl_operations() {
549        let engine = create_test_engine().await;
550        let database_id = 0;
551        
552        // Set key with TTL
553        let value = Value::String("ttl_value".to_string());
554        engine.set(database_id, "ttl_key".to_string(), value, Some(60)).await.unwrap();
555        
556        // Check TTL
557        let ttl = engine.ttl(database_id, &"ttl_key".to_string()).await.unwrap();
558        assert!(ttl.is_some());
559        assert!(ttl.unwrap() <= 60);
560        
561        // Set TTL on existing key
562        let set_ttl = engine.expire(database_id, &"ttl_key".to_string(), 120).await.unwrap();
563        assert!(set_ttl);
564        
565        let new_ttl = engine.ttl(database_id, &"ttl_key".to_string()).await.unwrap();
566        assert!(new_ttl.is_some());
567        assert!(new_ttl.unwrap() <= 120);
568    }
569
570    #[tokio::test]
571    async fn test_keys_operations() {
572        let engine = create_test_engine().await;
573        let database_id = 0;
574        
575        // Add some keys
576        let value = Value::String("value".to_string());
577        engine.set(database_id, "key1".to_string(), value.clone(), None).await.unwrap();
578        engine.set(database_id, "key2".to_string(), value.clone(), None).await.unwrap();
579        engine.set(database_id, "test_key".to_string(), value, None).await.unwrap();
580        
581        // Get all keys
582        let keys = engine.keys(database_id).await.unwrap();
583        assert_eq!(keys.len(), 3);
584        assert!(keys.contains(&"key1".to_string()));
585        assert!(keys.contains(&"key2".to_string()));
586        assert!(keys.contains(&"test_key".to_string()));
587        
588        // Get keys with pattern
589        let test_keys = engine.keys_pattern(database_id, "key*").await.unwrap();
590        assert_eq!(test_keys.len(), 2);
591        assert!(test_keys.contains(&"key1".to_string()));
592        assert!(test_keys.contains(&"key2".to_string()));
593    }
594
595    #[tokio::test]
596    async fn test_clear_database() {
597        let engine = create_test_engine().await;
598        let database_id = 0;
599        
600        // Add some keys
601        let value = Value::String("value".to_string());
602        engine.set(database_id, "key1".to_string(), value.clone(), None).await.unwrap();
603        engine.set(database_id, "key2".to_string(), value, None).await.unwrap();
604        
605        // Clear database
606        engine.clear_database(database_id).await.unwrap();
607        
608        // Check keys are gone
609        let keys = engine.keys(database_id).await.unwrap();
610        assert!(keys.is_empty());
611    }
612
613    #[tokio::test]
614    async fn test_stats() {
615        let engine = create_test_engine().await;
616        let database_id = 0;
617        
618        // Add some data
619        let value = Value::String("value".to_string());
620        engine.set(database_id, "key1".to_string(), value.clone(), None).await.unwrap();
621        engine.set(database_id, "key2".to_string(), value, None).await.unwrap();
622        
623        // Small delay to ensure uptime > 0
624        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
625        
626        let stats = engine.get_stats().await.unwrap();
627        assert_eq!(stats.total_keys, 2);
628        assert!(stats.total_operations > 0);
629        // Allow 0 for very fast tests - comparison is always true for u64
630        assert!(stats.uptime >= 0);
631    }
632}