1use crate::storage::aof::AOFStorage;
6use crate::storage::disk::DiskStorage;
7use crate::storage::hybrid::HybridStorage;
8use crate::storage::memory::MemoryStorage;
9use crate::storage::{PersistenceMode, Storage, StorageConfig, StorageStats};
10use crate::ttl::{TTLConfig, TTLManager, TTLStats};
11use crate::types::MapletConfig;
12use crate::{Maplet, MapletResult, MergeOperator};
13use serde::{Deserialize, Serialize};
14use std::sync::Arc;
15use std::time::SystemTime;
16use tokio::sync::RwLock;
17
18#[derive(Debug, Clone, Default)]
20pub struct ReplaceOperator;
21
22impl MergeOperator<Vec<u8>> for ReplaceOperator {
23 fn merge(&self, _existing: Vec<u8>, new: Vec<u8>) -> MapletResult<Vec<u8>> {
24 Ok(new)
25 }
26
27 fn identity(&self) -> Vec<u8> {
28 Vec::new()
29 }
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct EngineConfig {
35 pub maplet: MapletConfig,
37 pub storage: StorageConfig,
39 pub ttl: TTLConfig,
41 pub persistence_mode: PersistenceMode,
43 pub data_dir: Option<String>,
45}
46
47impl Default for EngineConfig {
48 fn default() -> Self {
49 Self {
50 maplet: MapletConfig::default(),
51 storage: StorageConfig::default(),
52 ttl: TTLConfig::default(),
53 persistence_mode: PersistenceMode::Memory,
54 data_dir: None,
55 }
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct EngineStats {
62 pub maplet_stats: crate::MapletStats,
64 pub storage_stats: StorageStats,
66 pub ttl_stats: TTLStats,
68 pub uptime_seconds: u64,
70 pub total_operations: u64,
72}
73
74pub struct Engine {
76 maplet: Arc<RwLock<Maplet<String, Vec<u8>, ReplaceOperator>>>,
78 storage: Arc<dyn Storage>,
80 ttl_manager: Arc<TTLManager>,
82 config: EngineConfig,
84 start_time: SystemTime,
86 operation_count: Arc<RwLock<u64>>,
88}
89
90impl Engine {
91 pub async fn new(config: EngineConfig) -> MapletResult<Self> {
93 let maplet = Arc::new(RwLock::new(
94 Maplet::<String, Vec<u8>, ReplaceOperator>::with_config(config.maplet.clone())?,
95 ));
96
97 let storage: Arc<dyn Storage> = match config.persistence_mode {
98 PersistenceMode::Memory => Arc::new(MemoryStorage::new(config.storage.clone())?),
99 PersistenceMode::Disk => Arc::new(DiskStorage::new(config.storage.clone())?),
100 PersistenceMode::AOF => Arc::new(AOFStorage::new(config.storage.clone())?),
101 PersistenceMode::Hybrid => Arc::new(HybridStorage::new(config.storage.clone())?),
102 };
103
104 let ttl_manager = Arc::new(TTLManager::new(config.ttl.clone()));
106
107 let storage_clone = Arc::clone(&storage);
109 ttl_manager
110 .start_cleanup(move |expired_entries| {
111 let storage = Arc::clone(&storage_clone);
112
113 tokio::spawn(async move {
115 for entry in expired_entries {
116 let _ = storage.delete(&entry.key).await;
118 }
119 });
120
121 Ok(())
122 })
123 .await?;
124
125 Ok(Self {
126 maplet,
127 storage,
128 ttl_manager,
129 config,
130 start_time: SystemTime::now(),
131 operation_count: Arc::new(RwLock::new(0)),
132 })
133 }
134
135 pub async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
137 if self.ttl_manager.is_expired(key).await? {
139 self.ttl_manager.remove_ttl(key).await?;
141 let _ = self.storage.delete(key).await;
142 return Ok(None);
143 }
144
145 let maplet_guard = self.maplet.read().await;
147 if !maplet_guard.contains(&key.to_string()).await {
148 drop(maplet_guard);
149 return Ok(None);
150 }
151 drop(maplet_guard);
152
153 let result = self.storage.get(key).await;
155
156 {
158 let mut count = self.operation_count.write().await;
159 *count += 1;
160 }
161
162 result
163 }
164
165 pub async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
167 self.maplet.write().await.insert(key.clone(), value.clone()).await?;
169
170 self.storage.set(key, value).await?;
172
173 {
175 let mut count = self.operation_count.write().await;
176 *count += 1;
177 }
178
179 Ok(())
180 }
181
182 pub async fn delete(&self, key: &str) -> MapletResult<bool> {
184 let result = self.storage.delete(key).await?;
186
187 {
193 let mut count = self.operation_count.write().await;
194 *count += 1;
195 }
196
197 Ok(result)
198 }
199
200 pub async fn exists(&self, key: &str) -> MapletResult<bool> {
202 let maplet_guard = self.maplet.read().await;
204 if !maplet_guard.contains(&key.to_string()).await {
205 drop(maplet_guard);
206 return Ok(false);
207 }
208 drop(maplet_guard);
209
210 let result = self.storage.exists(key).await?;
212
213 {
215 let mut count = self.operation_count.write().await;
216 *count += 1;
217 }
218
219 Ok(result)
220 }
221
222 pub async fn keys(&self) -> MapletResult<Vec<String>> {
224 let result = self.storage.keys().await?;
225
226 {
228 let mut count = self.operation_count.write().await;
229 *count += 1;
230 }
231
232 Ok(result)
233 }
234
235 pub async fn clear(&self) -> MapletResult<()> {
237 {
239 let mut maplet_guard = self.maplet.write().await;
240 *maplet_guard = Maplet::<String, Vec<u8>, ReplaceOperator>::with_config(
242 self.config.maplet.clone(),
243 )?;
244 }
245
246 self.storage.clear_database().await?;
248
249 {
251 let mut count = self.operation_count.write().await;
252 *count = 0;
253 }
254
255 Ok(())
256 }
257
258 pub async fn flush(&self) -> MapletResult<()> {
260 self.storage.flush().await?;
261 Ok(())
262 }
263
264 pub async fn close(&self) -> MapletResult<()> {
266 self.ttl_manager.stop_cleanup().await?;
268
269 self.storage.close().await?;
271 Ok(())
272 }
273
274 pub async fn stats(&self) -> MapletResult<EngineStats> {
276 let maplet_guard = self.maplet.read().await;
277 let maplet_stats = maplet_guard.stats().await;
278 drop(maplet_guard);
279
280 let storage_stats = self.storage.stats().await?;
281 let ttl_stats = self.ttl_manager.get_stats().await?;
282 let operation_count = *self.operation_count.read().await;
283
284 let uptime = self.start_time.elapsed().unwrap_or_default().as_secs();
285
286 Ok(EngineStats {
287 maplet_stats,
288 storage_stats,
289 ttl_stats,
290 uptime_seconds: uptime,
291 total_operations: operation_count,
292 })
293 }
294
295 pub async fn memory_usage(&self) -> MapletResult<u64> {
297 let storage_stats = self.storage.stats().await?;
298 Ok(storage_stats.memory_usage)
299 }
300
301 #[must_use]
303 pub const fn persistence_mode(&self) -> PersistenceMode {
304 self.config.persistence_mode
305 }
306
307 #[must_use]
309 pub const fn config(&self) -> &EngineConfig {
310 &self.config
311 }
312
313 pub async fn expire(&self, key: &str, ttl_seconds: u64) -> MapletResult<bool> {
315 if !self.exists(key).await? {
317 return Ok(false);
318 }
319
320 self.ttl_manager
322 .set_ttl(key.to_string(), 0, ttl_seconds)
323 .await?;
324
325 {
327 let mut count = self.operation_count.write().await;
328 *count += 1;
329 }
330
331 Ok(true)
332 }
333
334 pub async fn ttl(&self, key: &str) -> MapletResult<Option<i64>> {
336 let result = self.ttl_manager.get_ttl(key).await?;
337
338 {
340 let mut count = self.operation_count.write().await;
341 *count += 1;
342 }
343
344 Ok(result)
345 }
346
347 pub async fn persist(&self, key: &str) -> MapletResult<bool> {
349 let had_ttl = self.ttl_manager.get_ttl(key).await?.is_some();
350 self.ttl_manager.remove_ttl(key).await?;
351
352 {
354 let mut count = self.operation_count.write().await;
355 *count += 1;
356 }
357
358 Ok(had_ttl)
359 }
360
361 #[cfg(feature = "quotient-filter")]
363 pub async fn find_slot_for_key(&self, key: &str) -> MapletResult<Option<usize>> {
364 let maplet_guard = self.maplet.read().await;
365 let result = maplet_guard.find_slot_for_key(&key.to_string()).await;
366 drop(maplet_guard);
367 Ok(result)
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use tempfile::TempDir;
375
376 #[tokio::test]
377 async fn test_engine_creation() {
378 let config = EngineConfig::default();
379 let engine = Engine::new(config).await.unwrap();
380 assert_eq!(engine.persistence_mode(), PersistenceMode::Memory);
381 }
382
383 #[tokio::test]
384 async fn test_engine_basic_operations() {
385 let config = EngineConfig::default();
386 let engine = Engine::new(config).await.unwrap();
387
388 engine
390 .set("key1".to_string(), b"value1".to_vec())
391 .await
392 .unwrap();
393 let value = engine.get("key1").await.unwrap();
394 assert_eq!(value, Some(b"value1".to_vec()));
395
396 assert!(engine.exists("key1").await.unwrap());
398 assert!(!engine.exists("nonexistent").await.unwrap());
399
400 let deleted = engine.delete("key1").await.unwrap();
402 assert!(deleted);
403 assert!(!engine.exists("key1").await.unwrap());
404 }
405
406 #[tokio::test]
407 async fn test_engine_with_disk_storage() {
408 let temp_dir = TempDir::new().unwrap();
409 let data_dir = temp_dir
411 .path()
412 .join("disk_test")
413 .to_string_lossy()
414 .to_string();
415 let config = EngineConfig {
416 persistence_mode: PersistenceMode::Disk,
417 data_dir: Some(data_dir),
418 ..Default::default()
419 };
420
421 let engine = Engine::new(config).await.unwrap();
422
423 engine
425 .set("key1".to_string(), b"value1".to_vec())
426 .await
427 .unwrap();
428 let value = engine.get("key1").await.unwrap();
429 assert_eq!(value, Some(b"value1".to_vec()));
430
431 engine.close().await.unwrap();
432 }
433
434 #[tokio::test]
435 async fn test_engine_disk_persistence_behavior() {
436 use std::time::Duration;
451 use tokio::time::sleep;
452
453 let temp_dir = TempDir::new().unwrap();
454 let data_dir = temp_dir
455 .path()
456 .join("test_db")
457 .to_string_lossy()
458 .to_string();
459
460 {
462 let config1 = EngineConfig {
463 persistence_mode: PersistenceMode::Disk,
464 data_dir: Some(data_dir.clone()),
465 ..Default::default()
466 };
467
468 let engine1 = Engine::new(config1).await.unwrap();
469 engine1
470 .set("key1".to_string(), b"value1".to_vec())
471 .await
472 .unwrap();
473 engine1
474 .set("key2".to_string(), b"value2".to_vec())
475 .await
476 .unwrap();
477
478 assert_eq!(engine1.get("key1").await.unwrap(), Some(b"value1".to_vec()));
480 assert_eq!(engine1.get("key2").await.unwrap(), Some(b"value2".to_vec()));
481
482 engine1.flush().await.unwrap();
484
485 engine1.close().await.unwrap();
487 } let mut engine2_opt = None;
492 for attempt in 0..5 {
493 sleep(Duration::from_millis(500 * (attempt + 1))).await;
494
495 let config2 = EngineConfig {
496 persistence_mode: PersistenceMode::Disk,
497 data_dir: Some(data_dir.clone()),
498 ..Default::default()
499 };
500
501 match Engine::new(config2).await {
502 Ok(engine) => {
503 engine2_opt = Some(engine);
504 break;
505 }
506 Err(e) => {
507 if attempt < 4 && e.to_string().contains("could not acquire lock") {
508 continue; }
510 panic!("Failed to create engine2 after retries: {}", e);
511 }
512 }
513 }
514
515 let engine2 = engine2_opt.expect("Failed to create engine2 after all retries");
516
517 let keys = engine2.keys().await.unwrap();
519 assert!(
520 keys.contains(&"key1".to_string()),
521 "key1 should exist in storage"
522 );
523 assert!(
524 keys.contains(&"key2".to_string()),
525 "key2 should exist in storage"
526 );
527
528 let value1 = engine2.get("key1").await.unwrap();
536 let value2 = engine2.get("key2").await.unwrap();
537
538 assert_eq!(value1, None, "get() returns None for keys not in maplet");
540 assert_eq!(value2, None, "get() returns None for keys not in maplet");
541
542 engine2
544 .set("key1".to_string(), b"value1_updated".to_vec())
545 .await
546 .unwrap();
547 let value1_after_reinsert = engine2.get("key1").await.unwrap();
548 assert_eq!(value1_after_reinsert, Some(b"value1_updated".to_vec()));
549
550 engine2.close().await.unwrap();
551 }
552
553 #[tokio::test]
554 async fn test_engine_stats() {
555 let config = EngineConfig::default();
556 let engine = Engine::new(config).await.unwrap();
557
558 engine
560 .set("key1".to_string(), b"value1".to_vec())
561 .await
562 .unwrap();
563 engine.get("key1").await.unwrap();
564
565 let stats = engine.stats().await.unwrap();
566 assert!(stats.total_operations > 0);
567 assert!(stats.uptime_seconds >= 0); }
569
570 #[tokio::test]
571 async fn test_engine_ttl_operations() {
572 let config = EngineConfig::default();
573 let engine = Engine::new(config).await.unwrap();
574
575 engine
577 .set("key1".to_string(), b"value1".to_vec())
578 .await
579 .unwrap();
580
581 let result = engine.expire("key1", 60).await.unwrap();
583 assert!(result);
584
585 let ttl = engine.ttl("key1").await.unwrap();
587 assert!(ttl.is_some());
588 assert!(ttl.unwrap() <= 60);
589
590 let had_ttl = engine.persist("key1").await.unwrap();
592 assert!(had_ttl);
593
594 let ttl = engine.ttl("key1").await.unwrap();
596 assert!(ttl.is_none());
597 }
598
599 #[tokio::test]
600 async fn test_engine_ttl_expiration() {
601 let config = EngineConfig::default();
602 let engine = Engine::new(config).await.unwrap();
603
604 engine
606 .set("key1".to_string(), b"value1".to_vec())
607 .await
608 .unwrap();
609 engine.expire("key1", 1).await.unwrap();
610
611 tokio::time::sleep(tokio::time::Duration::from_millis(1100)).await;
613
614 let value = engine.get("key1").await.unwrap();
616 assert!(value.is_none());
617 }
618}