1use crate::{Maplet, MapletResult, MergeOperator};
6use crate::types::MapletConfig;
7use crate::storage::{Storage, StorageStats, StorageConfig, PersistenceMode};
8use crate::storage::memory::MemoryStorage;
9use crate::storage::disk::DiskStorage;
10use crate::storage::aof::AOFStorage;
11use crate::storage::hybrid::HybridStorage;
12use crate::ttl::{TTLManager, TTLConfig, TTLStats};
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use serde::{Serialize, Deserialize};
16use std::time::SystemTime;
17
18#[derive(Debug, Clone, Default)]
20pub struct ReplaceOperator;
21
22impl MergeOperator<Vec<u8>> for ReplaceOperator {
23 fn merge(&self, _existing: Vec<u8>, new: Vec<u8>) -> MapletResult<Vec<u8>> {
24 Ok(new)
25 }
26
27 fn identity(&self) -> Vec<u8> {
28 Vec::new()
29 }
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct EngineConfig {
35 pub maplet: MapletConfig,
37 pub storage: StorageConfig,
39 pub ttl: TTLConfig,
41 pub persistence_mode: PersistenceMode,
43 pub data_dir: Option<String>,
45}
46
47impl Default for EngineConfig {
48 fn default() -> Self {
49 Self {
50 maplet: MapletConfig::default(),
51 storage: StorageConfig::default(),
52 ttl: TTLConfig::default(),
53 persistence_mode: PersistenceMode::Memory,
54 data_dir: None,
55 }
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct EngineStats {
62 pub maplet_stats: crate::MapletStats,
64 pub storage_stats: StorageStats,
66 pub ttl_stats: TTLStats,
68 pub uptime_seconds: u64,
70 pub total_operations: u64,
72}
73
74pub struct Engine {
76 maplet: Arc<RwLock<Maplet<String, Vec<u8>, ReplaceOperator>>>,
78 storage: Arc<dyn Storage>,
80 ttl_manager: Arc<TTLManager>,
82 config: EngineConfig,
84 start_time: SystemTime,
86 operation_count: Arc<RwLock<u64>>,
88}
89
90impl Engine {
91 pub async fn new(config: EngineConfig) -> MapletResult<Self> {
93 let maplet = Arc::new(RwLock::new(
94 Maplet::<String, Vec<u8>, ReplaceOperator>::with_config(config.maplet.clone())?
95 ));
96
97 let storage: Arc<dyn Storage> = match config.persistence_mode {
98 PersistenceMode::Memory => {
99 Arc::new(MemoryStorage::new(config.storage.clone())?)
100 },
101 PersistenceMode::Disk => {
102 Arc::new(DiskStorage::new(config.storage.clone())?)
103 },
104 PersistenceMode::AOF => {
105 Arc::new(AOFStorage::new(config.storage.clone())?)
106 },
107 PersistenceMode::Hybrid => {
108 Arc::new(HybridStorage::new(config.storage.clone())?)
109 },
110 };
111
112 let ttl_manager = Arc::new(TTLManager::new(config.ttl.clone()));
114
115 let storage_clone = Arc::clone(&storage);
117 ttl_manager.start_cleanup(move |expired_entries| {
118 let storage = Arc::clone(&storage_clone);
119
120 tokio::spawn(async move {
122 for entry in expired_entries {
123 let _ = storage.delete(&entry.key).await;
125 }
126 });
127
128 Ok(())
129 }).await?;
130
131 Ok(Self {
132 maplet,
133 storage,
134 ttl_manager,
135 config,
136 start_time: SystemTime::now(),
137 operation_count: Arc::new(RwLock::new(0)),
138 })
139 }
140
141 pub async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
143 if self.ttl_manager.is_expired(key).await? {
145 self.ttl_manager.remove_ttl(key).await?;
147 let _ = self.storage.delete(key).await;
148 return Ok(None);
149 }
150
151 let maplet_guard = self.maplet.read().await;
153 if !maplet_guard.contains(&key.to_string()).await {
154 drop(maplet_guard);
155 return Ok(None);
156 }
157 drop(maplet_guard);
158
159 let result = self.storage.get(key).await;
161
162 {
164 let mut count = self.operation_count.write().await;
165 *count += 1;
166 }
167
168 result
169 }
170
171 pub async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
173 {
175 let maplet_guard = self.maplet.write().await;
176 maplet_guard.insert(key.clone(), value.clone()).await?;
177 }
178
179 self.storage.set(key, value).await?;
181
182 {
184 let mut count = self.operation_count.write().await;
185 *count += 1;
186 }
187
188 Ok(())
189 }
190
191 pub async fn delete(&self, key: &str) -> MapletResult<bool> {
193 let result = self.storage.delete(key).await?;
195
196 {
202 let mut count = self.operation_count.write().await;
203 *count += 1;
204 }
205
206 Ok(result)
207 }
208
209 pub async fn exists(&self, key: &str) -> MapletResult<bool> {
211 let maplet_guard = self.maplet.read().await;
213 if !maplet_guard.contains(&key.to_string()).await {
214 drop(maplet_guard);
215 return Ok(false);
216 }
217 drop(maplet_guard);
218
219 let result = self.storage.exists(key).await?;
221
222 {
224 let mut count = self.operation_count.write().await;
225 *count += 1;
226 }
227
228 Ok(result)
229 }
230
231 pub async fn keys(&self) -> MapletResult<Vec<String>> {
233 let result = self.storage.keys().await?;
234
235 {
237 let mut count = self.operation_count.write().await;
238 *count += 1;
239 }
240
241 Ok(result)
242 }
243
244 pub async fn clear(&self) -> MapletResult<()> {
246 {
248 let mut maplet_guard = self.maplet.write().await;
249 *maplet_guard = Maplet::<String, Vec<u8>, ReplaceOperator>::with_config(self.config.maplet.clone())?;
251 }
252
253 self.storage.clear_database().await?;
255
256 {
258 let mut count = self.operation_count.write().await;
259 *count = 0;
260 }
261
262 Ok(())
263 }
264
265 pub async fn flush(&self) -> MapletResult<()> {
267 self.storage.flush().await?;
268 Ok(())
269 }
270
271 pub async fn close(&self) -> MapletResult<()> {
273 self.ttl_manager.stop_cleanup().await?;
275
276 self.storage.close().await?;
278 Ok(())
279 }
280
281 pub async fn stats(&self) -> MapletResult<EngineStats> {
283 let maplet_guard = self.maplet.read().await;
284 let maplet_stats = maplet_guard.stats().await;
285 drop(maplet_guard);
286
287 let storage_stats = self.storage.stats().await?;
288 let ttl_stats = self.ttl_manager.get_stats().await?;
289 let operation_count = *self.operation_count.read().await;
290
291 let uptime = self.start_time.elapsed()
292 .unwrap_or_default()
293 .as_secs();
294
295 Ok(EngineStats {
296 maplet_stats,
297 storage_stats,
298 ttl_stats,
299 uptime_seconds: uptime,
300 total_operations: operation_count,
301 })
302 }
303
304 pub async fn memory_usage(&self) -> MapletResult<u64> {
306 let storage_stats = self.storage.stats().await?;
307 Ok(storage_stats.memory_usage)
308 }
309
310 #[must_use]
312 pub fn persistence_mode(&self) -> PersistenceMode {
313 self.config.persistence_mode
314 }
315
316 #[must_use]
318 pub const fn config(&self) -> &EngineConfig {
319 &self.config
320 }
321
322 pub async fn expire(&self, key: &str, ttl_seconds: u64) -> MapletResult<bool> {
324 if !self.exists(key).await? {
326 return Ok(false);
327 }
328
329 self.ttl_manager.set_ttl(key.to_string(), 0, ttl_seconds).await?;
331
332 {
334 let mut count = self.operation_count.write().await;
335 *count += 1;
336 }
337
338 Ok(true)
339 }
340
341 pub async fn ttl(&self, key: &str) -> MapletResult<Option<i64>> {
343 let result = self.ttl_manager.get_ttl(key).await?;
344
345 {
347 let mut count = self.operation_count.write().await;
348 *count += 1;
349 }
350
351 Ok(result)
352 }
353
354 pub async fn persist(&self, key: &str) -> MapletResult<bool> {
356 let had_ttl = self.ttl_manager.get_ttl(key).await?.is_some();
357 self.ttl_manager.remove_ttl(key).await?;
358
359 {
361 let mut count = self.operation_count.write().await;
362 *count += 1;
363 }
364
365 Ok(had_ttl)
366 }
367
368 #[cfg(feature = "quotient-filter")]
370 pub async fn find_slot_for_key(&self, key: &str) -> MapletResult<Option<usize>> {
371 let maplet_guard = self.maplet.read().await;
372 let result = maplet_guard.find_slot_for_key(&key.to_string()).await;
373 drop(maplet_guard);
374 Ok(result)
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381 use tempfile::TempDir;
382
383 #[tokio::test]
384 async fn test_engine_creation() {
385 let config = EngineConfig::default();
386 let engine = Engine::new(config).await.unwrap();
387 assert_eq!(engine.persistence_mode(), PersistenceMode::Memory);
388 }
389
390 #[tokio::test]
391 async fn test_engine_basic_operations() {
392 let config = EngineConfig::default();
393 let engine = Engine::new(config).await.unwrap();
394
395 engine.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
397 let value = engine.get("key1").await.unwrap();
398 assert_eq!(value, Some(b"value1".to_vec()));
399
400 assert!(engine.exists("key1").await.unwrap());
402 assert!(!engine.exists("nonexistent").await.unwrap());
403
404 let deleted = engine.delete("key1").await.unwrap();
406 assert!(deleted);
407 assert!(!engine.exists("key1").await.unwrap());
408 }
409
410 #[tokio::test]
411 async fn test_engine_with_disk_storage() {
412 let temp_dir = TempDir::new().unwrap();
413 let data_dir = temp_dir.path().join("disk_test").to_string_lossy().to_string();
415 let config = EngineConfig {
416 persistence_mode: PersistenceMode::Disk,
417 data_dir: Some(data_dir),
418 ..Default::default()
419 };
420
421 let engine = Engine::new(config).await.unwrap();
422
423 engine.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
425 let value = engine.get("key1").await.unwrap();
426 assert_eq!(value, Some(b"value1".to_vec()));
427
428 engine.close().await.unwrap();
429 }
430
431 #[tokio::test]
432 async fn test_engine_disk_persistence_behavior() {
433 use std::time::Duration;
448 use tokio::time::sleep;
449
450 let temp_dir = TempDir::new().unwrap();
451 let data_dir = temp_dir.path().join("test_db").to_string_lossy().to_string();
452
453 {
455 let config1 = EngineConfig {
456 persistence_mode: PersistenceMode::Disk,
457 data_dir: Some(data_dir.clone()),
458 ..Default::default()
459 };
460
461 let engine1 = Engine::new(config1).await.unwrap();
462 engine1.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
463 engine1.set("key2".to_string(), b"value2".to_vec()).await.unwrap();
464
465 assert_eq!(engine1.get("key1").await.unwrap(), Some(b"value1".to_vec()));
467 assert_eq!(engine1.get("key2").await.unwrap(), Some(b"value2".to_vec()));
468
469 engine1.flush().await.unwrap();
471
472 engine1.close().await.unwrap();
474 } let mut engine2_opt = None;
479 for attempt in 0..5 {
480 sleep(Duration::from_millis(500 * (attempt + 1))).await;
481
482 let config2 = EngineConfig {
483 persistence_mode: PersistenceMode::Disk,
484 data_dir: Some(data_dir.clone()),
485 ..Default::default()
486 };
487
488 match Engine::new(config2).await {
489 Ok(engine) => {
490 engine2_opt = Some(engine);
491 break;
492 }
493 Err(e) => {
494 if attempt < 4 && e.to_string().contains("could not acquire lock") {
495 continue; } else {
497 panic!("Failed to create engine2 after retries: {}", e);
498 }
499 }
500 }
501 }
502
503 let engine2 = engine2_opt.expect("Failed to create engine2 after all retries");
504
505 let keys = engine2.keys().await.unwrap();
507 assert!(keys.contains(&"key1".to_string()), "key1 should exist in storage");
508 assert!(keys.contains(&"key2".to_string()), "key2 should exist in storage");
509
510 let value1 = engine2.get("key1").await.unwrap();
518 let value2 = engine2.get("key2").await.unwrap();
519
520 assert_eq!(value1, None, "get() returns None for keys not in maplet");
522 assert_eq!(value2, None, "get() returns None for keys not in maplet");
523
524 engine2.set("key1".to_string(), b"value1_updated".to_vec()).await.unwrap();
526 let value1_after_reinsert = engine2.get("key1").await.unwrap();
527 assert_eq!(value1_after_reinsert, Some(b"value1_updated".to_vec()));
528
529 engine2.close().await.unwrap();
530 }
531
532 #[tokio::test]
533 async fn test_engine_stats() {
534 let config = EngineConfig::default();
535 let engine = Engine::new(config).await.unwrap();
536
537 engine.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
539 engine.get("key1").await.unwrap();
540
541 let stats = engine.stats().await.unwrap();
542 assert!(stats.total_operations > 0);
543 assert!(stats.uptime_seconds >= 0); }
545
546 #[tokio::test]
547 async fn test_engine_ttl_operations() {
548 let config = EngineConfig::default();
549 let engine = Engine::new(config).await.unwrap();
550
551 engine.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
553
554 let result = engine.expire("key1", 60).await.unwrap();
556 assert!(result);
557
558 let ttl = engine.ttl("key1").await.unwrap();
560 assert!(ttl.is_some());
561 assert!(ttl.unwrap() <= 60);
562
563 let had_ttl = engine.persist("key1").await.unwrap();
565 assert!(had_ttl);
566
567 let ttl = engine.ttl("key1").await.unwrap();
569 assert!(ttl.is_none());
570 }
571
572 #[tokio::test]
573 async fn test_engine_ttl_expiration() {
574 let config = EngineConfig::default();
575 let engine = Engine::new(config).await.unwrap();
576
577 engine.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
579 engine.expire("key1", 1).await.unwrap();
580
581 tokio::time::sleep(tokio::time::Duration::from_millis(1100)).await;
583
584 let value = engine.get("key1").await.unwrap();
586 assert!(value.is_none());
587 }
588}