1use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tracing::{error, info};
9
10use crate::{
11 KVError, KVResult, Key, Value, Entry, DatabaseId,
12 KVConfig, KVStats, TTL, StorageFactory,
13 encryption::KeyManager,
14 ttl::{TTLManager, TTLSupport},
15 storage::Storage as StorageTrait,
16 pubsub::{PubSubManager, ChannelPattern},
17};
18
19pub struct KVEngine {
21 config: KVConfig,
23 storages: Arc<RwLock<HashMap<DatabaseId, Box<dyn StorageTrait>>>>,
25 _key_manager: Arc<RwLock<KeyManager>>,
27 ttl_manager: Arc<RwLock<TTLManager>>,
29 pubsub_manager: Arc<RwLock<PubSubManager>>,
31 stats: Arc<RwLock<KVStats>>,
33 start_time: std::time::Instant,
35}
36
37impl KVEngine {
38 pub async fn new(config: KVConfig) -> KVResult<Self> {
43 info!("Initializing KV engine with config: {:?}", config);
44
45 let key_manager = KeyManager::new(&config.master_key)?;
47
48 let ttl_manager = TTLManager::new(
50 std::time::Duration::from_secs(config.expiration_check_interval)
51 );
52
53 let mut pubsub_manager = PubSubManager::default();
55 pubsub_manager.start_cleanup();
56
57 let engine = Self {
58 config: config.clone(),
59 storages: Arc::new(RwLock::new(HashMap::new())),
60 _key_manager: Arc::new(RwLock::new(key_manager)),
61 ttl_manager: Arc::new(RwLock::new(ttl_manager)),
62 pubsub_manager: Arc::new(RwLock::new(pubsub_manager)),
63 stats: Arc::new(RwLock::new(KVStats {
64 total_keys: 0,
65 expired_keys: 0,
66 memory_usage: 0,
67 disk_usage: 0,
68 total_operations: 0,
69 ops_per_second: 0.0,
70 uptime: 0,
71 active_connections: 0,
72 })),
73 start_time: std::time::Instant::now(),
74 };
75
76 engine.start_ttl_cleanup().await;
78
79 engine.ensure_storage(0).await?;
81
82 info!("KV engine initialized successfully");
83 Ok(engine)
84 }
85
86 #[allow(clippy::significant_drop_tightening)]
88 async fn ensure_storage(&self, database_id: DatabaseId) -> KVResult<()> {
89 let mut storages = self.storages.write().await;
90 if let std::collections::hash_map::Entry::Vacant(e) = storages.entry(database_id) {
91 let storage = StorageFactory::create(
92 self.config.persistence_mode,
93 &self.config.data_dir,
94 database_id,
95 ).await?;
96 e.insert(storage);
97 }
98 Ok(())
99 }
100
101 #[allow(clippy::option_if_let_else)]
103 async fn _get_storage(&self, database_id: DatabaseId) -> KVResult<Arc<Box<dyn StorageTrait>>> {
104 self.ensure_storage(database_id).await?;
105 let storages = self.storages.read().await;
106 if let Some(_storage) = storages.get(&database_id) {
107 Err(KVError::Internal("Storage access not implemented".to_string()))
111 } else {
112 Err(KVError::Internal("Storage not found".to_string()))
113 }
114 }
115
116 async fn start_ttl_cleanup(&self) {
118 let ttl_manager = Arc::clone(&self.ttl_manager);
119 let storages = Arc::clone(&self.storages);
120
121 {
122 let mut ttl_manager_guard = ttl_manager.write().await;
123 ttl_manager_guard.start_cleanup(move |expired_keys| {
124 let storages = Arc::clone(&storages);
125
126 tokio::spawn(async move {
127 for key in expired_keys {
128 let storages_guard = storages.read().await;
130 for (database_id, storage) in storages_guard.iter() {
131 if let Err(e) = storage.delete(*database_id, &key).await {
132 error!("Failed to delete expired key {} from database {}: {}", key, database_id, e);
133 }
134 }
135 }
136 });
137 });
138 }
139 }
140
141 async fn update_stats(&self, operation_count: u64) {
143 let mut stats = self.stats.write().await;
144 stats.total_operations += operation_count;
145 stats.uptime = self.start_time.elapsed().as_secs();
146
147 if stats.uptime > 0 {
148 #[allow(clippy::cast_precision_loss)]
149 let ops_per_second = stats.total_operations as f64 / stats.uptime as f64;
150 stats.ops_per_second = ops_per_second;
151 }
152 }
153
154 #[allow(clippy::significant_drop_tightening)]
161 pub async fn get(&self, database_id: DatabaseId, key: &Key) -> KVResult<Option<Value>> {
162 self.ensure_storage(database_id).await?;
163
164 let storages = self.storages.read().await;
165 let storage = storages.get(&database_id)
166 .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
167
168 let entry = storage.get(database_id, key).await?;
169
170 if let Some(mut entry) = entry {
171 if entry.is_expired() {
173 drop(storages); let _ = self.delete(database_id, key).await;
176 return Ok(None);
177 }
178
179 entry.touch();
181
182 drop(storages); let mut storages = self.storages.write().await;
185 let storage = storages.get_mut(&database_id)
186 .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
187 storage.set(database_id, key.clone(), entry.clone()).await?;
188
189 self.update_stats(1).await;
190 Ok(Some(entry.value))
191 } else {
192 self.update_stats(1).await;
193 Ok(None)
194 }
195 }
196
197 #[allow(clippy::significant_drop_tightening)]
202 pub async fn set(&self, database_id: DatabaseId, key: Key, value: Value, ttl: Option<TTL>) -> KVResult<()> {
203 self.ensure_storage(database_id).await?;
204
205 let entry = Entry::new(value, ttl);
206
207 let mut storages = self.storages.write().await;
208 let storage = storages.get_mut(&database_id)
209 .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
210
211 storage.set(database_id, key.clone(), entry).await?;
212
213 if let Some(ttl) = ttl {
215 let ttl_manager = self.ttl_manager.read().await;
216 ttl_manager.set_ttl(key.clone(), ttl).await?;
217 }
218
219 if let Err(e) = self.publish_invalidation(&key).await {
221 error!("Failed to publish invalidation for key {}: {}", key, e);
222 }
223
224 self.update_stats(1).await;
225 Ok(())
226 }
227
228 #[allow(clippy::significant_drop_tightening)]
233 pub async fn delete(&self, database_id: DatabaseId, key: &Key) -> KVResult<bool> {
234 self.ensure_storage(database_id).await?;
235
236 let mut storages = self.storages.write().await;
237 let storage = storages.get_mut(&database_id)
238 .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
239
240 let deleted = storage.delete(database_id, key).await?;
241
242 if deleted {
243 let ttl_manager = self.ttl_manager.read().await;
245 let _ = ttl_manager.remove_ttl(key).await;
246
247 if let Err(e) = self.publish_invalidation(key).await {
249 error!("Failed to publish invalidation for key {}: {}", key, e);
250 }
251 }
252
253 self.update_stats(1).await;
254 Ok(deleted)
255 }
256
257 #[allow(clippy::significant_drop_tightening)]
262 pub async fn exists(&self, database_id: DatabaseId, key: &Key) -> KVResult<bool> {
263 self.ensure_storage(database_id).await?;
264
265 let storages = self.storages.read().await;
266 let storage = storages.get(&database_id)
267 .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
268
269 let exists = storage.exists(database_id, key).await?;
270
271 self.update_stats(1).await;
272 Ok(exists)
273 }
274
275 #[allow(clippy::significant_drop_tightening)]
280 pub async fn expire(&self, database_id: DatabaseId, key: &Key, ttl: TTL) -> KVResult<bool> {
281 self.ensure_storage(database_id).await?;
282
283 if !self.exists(database_id, key).await? {
285 return Ok(false);
286 }
287
288 let ttl_manager = self.ttl_manager.read().await;
290 ttl_manager.set_ttl(key.clone(), ttl).await?;
291
292 let storages = self.storages.read().await;
294 let storage = storages.get(&database_id)
295 .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
296
297 if let Some(mut entry) = storage.get(database_id, key).await? {
298 entry.set_ttl(ttl);
299 drop(storages); let mut storages = self.storages.write().await;
301 let storage = storages.get_mut(&database_id)
302 .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
303 storage.set(database_id, key.clone(), entry).await?;
304 }
305
306 self.update_stats(1).await;
307 Ok(true)
308 }
309
310 #[allow(clippy::significant_drop_tightening)]
315 pub async fn ttl(&self, database_id: DatabaseId, key: &Key) -> KVResult<Option<TTL>> {
316 self.ensure_storage(database_id).await?;
317
318 let ttl_manager = self.ttl_manager.read().await;
319 let ttl = ttl_manager.get_ttl(key).await?;
320
321 self.update_stats(1).await;
322 Ok(ttl)
323 }
324
325 #[allow(clippy::significant_drop_tightening)]
330 pub async fn keys(&self, database_id: DatabaseId) -> KVResult<Vec<Key>> {
331 self.ensure_storage(database_id).await?;
332
333 let storages = self.storages.read().await;
334 let storage = storages.get(&database_id)
335 .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
336
337 let keys = storage.keys(database_id).await?;
338
339 self.update_stats(1).await;
340 Ok(keys)
341 }
342
343 #[allow(clippy::significant_drop_tightening)]
348 pub async fn keys_pattern(&self, database_id: DatabaseId, pattern: &str) -> KVResult<Vec<Key>> {
349 self.ensure_storage(database_id).await?;
350
351 let storages = self.storages.read().await;
352 let storage = storages.get(&database_id)
353 .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
354
355 let keys = storage.keys_pattern(database_id, pattern).await?;
356
357 self.update_stats(1).await;
358 Ok(keys)
359 }
360
361 #[allow(clippy::significant_drop_tightening)]
366 pub async fn clear_database(&self, database_id: DatabaseId) -> KVResult<()> {
367 self.ensure_storage(database_id).await?;
368
369 let mut storages = self.storages.write().await;
370 let storage = storages.get_mut(&database_id)
371 .ok_or_else(|| KVError::Internal("Storage not found".to_string()))?;
372
373 storage.clear_database(database_id).await?;
374
375 let ttl_manager = self.ttl_manager.read().await;
377 ttl_manager.clear_all().await;
378
379 self.update_stats(1).await;
380 Ok(())
381 }
382
383 #[allow(clippy::significant_drop_tightening)]
388 pub async fn get_stats(&self) -> KVResult<KVStats> {
389 let mut stats = self.stats.read().await.clone();
390
391 let storages = self.storages.read().await;
393 let mut total_memory = 0u64;
394 let mut total_disk = 0u64;
395 let mut total_keys = 0u64;
396
397 for (database_id, storage) in storages.iter() {
398 if let Ok(storage_stats) = storage.get_stats(*database_id).await {
399 total_memory += storage_stats.memory_usage;
400 total_disk += storage_stats.disk_usage.unwrap_or(0);
401 total_keys += storage_stats.total_keys;
402 }
403 }
404
405 stats.memory_usage = total_memory;
406 stats.disk_usage = total_disk;
407 stats.total_keys = total_keys;
408
409 Ok(stats)
410 }
411
412 #[allow(clippy::significant_drop_tightening)]
417 pub async fn flush(&self) -> KVResult<()> {
418 let storages = self.storages.read().await;
419 for (database_id, storage) in storages.iter() {
420 if let Err(e) = storage.flush().await {
421 error!("Failed to flush database {}: {}", database_id, e);
422 }
423 }
424 Ok(())
425 }
426
427 #[allow(clippy::significant_drop_tightening)]
432 pub async fn close(&self) -> KVResult<()> {
433 info!("Closing KV engine");
434
435 let mut ttl_manager = self.ttl_manager.write().await;
437 ttl_manager.stop_cleanup();
438
439 let mut pubsub_manager = self.pubsub_manager.write().await;
441 pubsub_manager.stop_cleanup();
442
443 let storages = self.storages.read().await;
445 for (database_id, storage) in storages.iter() {
446 if let Err(e) = storage.close().await {
447 error!("Failed to close storage for database {}: {}", database_id, e);
448 }
449 }
450
451 info!("KV engine closed");
452 Ok(())
453 }
454
455 pub async fn publish(&self, channel: &str, message: Value) -> KVResult<usize> {
462 let pubsub_manager = self.pubsub_manager.read().await;
463 pubsub_manager.publish(channel, message).await
464 }
465
466 pub async fn subscribe(&self, pattern: ChannelPattern) -> KVResult<tokio::sync::mpsc::UnboundedReceiver<crate::PubSubMessage>> {
471 let pubsub_manager = self.pubsub_manager.read().await;
472 pubsub_manager.subscribe(pattern).await
473 }
474
475 pub async fn unsubscribe(&self, pattern: &ChannelPattern) -> KVResult<usize> {
480 let pubsub_manager = self.pubsub_manager.read().await;
481 pubsub_manager.unsubscribe(pattern).await
482 }
483
484 pub async fn subscribe_to_invalidations(&self) -> KVResult<tokio::sync::mpsc::UnboundedReceiver<crate::PubSubMessage>> {
489 let pattern = ChannelPattern::wildcard("cache:invalidate:*".to_string());
490 self.subscribe(pattern).await
491 }
492
493 async fn publish_invalidation(&self, key: &Key) -> KVResult<usize> {
498 let channel = format!("cache:invalidate:{}", key);
499 let message = Value::String(format!("invalidate:{}", key));
500 self.publish(&channel, message).await
501 }
502}
503
504#[cfg(test)]
505mod tests {
506 use super::*;
507 use tempfile::TempDir;
508 use crate::PersistenceMode;
509
510 async fn create_test_engine() -> KVEngine {
511 let temp_dir = TempDir::new().unwrap();
512 let config = KVConfig {
513 master_key: String::new(), persistence_mode: PersistenceMode::Memory,
515 data_dir: temp_dir.path().to_string_lossy().to_string(),
516 ..Default::default()
517 };
518
519 KVEngine::new(config).await.unwrap()
520 }
521
522 #[tokio::test]
523 async fn test_basic_operations() {
524 let engine = create_test_engine().await;
525 let database_id = 0;
526
527 let value = Value::String("test_value".to_string());
529 engine.set(database_id, "test_key".to_string(), value.clone(), None).await.unwrap();
530
531 let retrieved = engine.get(database_id, &"test_key".to_string()).await.unwrap();
532 assert!(retrieved.is_some());
533 assert_eq!(retrieved.unwrap().as_string().unwrap(), "test_value");
534
535 let exists = engine.exists(database_id, &"test_key".to_string()).await.unwrap();
537 assert!(exists);
538
539 let deleted = engine.delete(database_id, &"test_key".to_string()).await.unwrap();
541 assert!(deleted);
542
543 let exists_after = engine.exists(database_id, &"test_key".to_string()).await.unwrap();
544 assert!(!exists_after);
545 }
546
547 #[tokio::test]
548 async fn test_ttl_operations() {
549 let engine = create_test_engine().await;
550 let database_id = 0;
551
552 let value = Value::String("ttl_value".to_string());
554 engine.set(database_id, "ttl_key".to_string(), value, Some(60)).await.unwrap();
555
556 let ttl = engine.ttl(database_id, &"ttl_key".to_string()).await.unwrap();
558 assert!(ttl.is_some());
559 assert!(ttl.unwrap() <= 60);
560
561 let set_ttl = engine.expire(database_id, &"ttl_key".to_string(), 120).await.unwrap();
563 assert!(set_ttl);
564
565 let new_ttl = engine.ttl(database_id, &"ttl_key".to_string()).await.unwrap();
566 assert!(new_ttl.is_some());
567 assert!(new_ttl.unwrap() <= 120);
568 }
569
570 #[tokio::test]
571 async fn test_keys_operations() {
572 let engine = create_test_engine().await;
573 let database_id = 0;
574
575 let value = Value::String("value".to_string());
577 engine.set(database_id, "key1".to_string(), value.clone(), None).await.unwrap();
578 engine.set(database_id, "key2".to_string(), value.clone(), None).await.unwrap();
579 engine.set(database_id, "test_key".to_string(), value, None).await.unwrap();
580
581 let keys = engine.keys(database_id).await.unwrap();
583 assert_eq!(keys.len(), 3);
584 assert!(keys.contains(&"key1".to_string()));
585 assert!(keys.contains(&"key2".to_string()));
586 assert!(keys.contains(&"test_key".to_string()));
587
588 let test_keys = engine.keys_pattern(database_id, "key*").await.unwrap();
590 assert_eq!(test_keys.len(), 2);
591 assert!(test_keys.contains(&"key1".to_string()));
592 assert!(test_keys.contains(&"key2".to_string()));
593 }
594
595 #[tokio::test]
596 async fn test_clear_database() {
597 let engine = create_test_engine().await;
598 let database_id = 0;
599
600 let value = Value::String("value".to_string());
602 engine.set(database_id, "key1".to_string(), value.clone(), None).await.unwrap();
603 engine.set(database_id, "key2".to_string(), value, None).await.unwrap();
604
605 engine.clear_database(database_id).await.unwrap();
607
608 let keys = engine.keys(database_id).await.unwrap();
610 assert!(keys.is_empty());
611 }
612
613 #[tokio::test]
614 async fn test_stats() {
615 let engine = create_test_engine().await;
616 let database_id = 0;
617
618 let value = Value::String("value".to_string());
620 engine.set(database_id, "key1".to_string(), value.clone(), None).await.unwrap();
621 engine.set(database_id, "key2".to_string(), value, None).await.unwrap();
622
623 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
625
626 let stats = engine.get_stats().await.unwrap();
627 assert_eq!(stats.total_keys, 2);
628 assert!(stats.total_operations > 0);
629 assert!(stats.uptime >= 0);
631 }
632}