1use crate::{Maplet, MapletResult, MergeOperator};
6use crate::types::MapletConfig;
7use crate::storage::{Storage, StorageStats, StorageConfig, PersistenceMode};
8use crate::storage::memory::MemoryStorage;
9use crate::storage::disk::DiskStorage;
10use crate::storage::aof::AOFStorage;
11use crate::storage::hybrid::HybridStorage;
12use crate::ttl::{TTLManager, TTLConfig, TTLStats};
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use serde::{Serialize, Deserialize};
16use std::time::SystemTime;
17
18#[derive(Debug, Clone, Default)]
20pub struct ReplaceOperator;
21
22impl MergeOperator<Vec<u8>> for ReplaceOperator {
23 fn merge(&self, _existing: Vec<u8>, new: Vec<u8>) -> MapletResult<Vec<u8>> {
24 Ok(new)
25 }
26
27 fn identity(&self) -> Vec<u8> {
28 Vec::new()
29 }
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct EngineConfig {
35 pub maplet: MapletConfig,
37 pub storage: StorageConfig,
39 pub ttl: TTLConfig,
41 pub persistence_mode: PersistenceMode,
43 pub data_dir: Option<String>,
45}
46
47impl Default for EngineConfig {
48 fn default() -> Self {
49 Self {
50 maplet: MapletConfig::default(),
51 storage: StorageConfig::default(),
52 ttl: TTLConfig::default(),
53 persistence_mode: PersistenceMode::Memory,
54 data_dir: None,
55 }
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct EngineStats {
62 pub maplet_stats: crate::MapletStats,
64 pub storage_stats: StorageStats,
66 pub ttl_stats: TTLStats,
68 pub uptime_seconds: u64,
70 pub total_operations: u64,
72}
73
74pub struct Engine {
76 maplet: Arc<RwLock<Maplet<String, Vec<u8>, ReplaceOperator>>>,
78 storage: Arc<dyn Storage>,
80 ttl_manager: Arc<TTLManager>,
82 config: EngineConfig,
84 start_time: SystemTime,
86 operation_count: Arc<RwLock<u64>>,
88}
89
90impl Engine {
91 pub async fn new(config: EngineConfig) -> MapletResult<Self> {
93 let maplet = Arc::new(RwLock::new(
94 Maplet::<String, Vec<u8>, ReplaceOperator>::with_config(config.maplet.clone())?
95 ));
96
97 let storage: Arc<dyn Storage> = match config.persistence_mode {
98 PersistenceMode::Memory => {
99 Arc::new(MemoryStorage::new(config.storage.clone()).await?)
100 },
101 PersistenceMode::Disk => {
102 Arc::new(DiskStorage::new(config.storage.clone()).await?)
103 },
104 PersistenceMode::AOF => {
105 Arc::new(AOFStorage::new(config.storage.clone()).await?)
106 },
107 PersistenceMode::Hybrid => {
108 Arc::new(HybridStorage::new(config.storage.clone()).await?)
109 },
110 };
111
112 let ttl_manager = Arc::new(TTLManager::new(config.ttl.clone()));
114
115 let storage_clone = Arc::clone(&storage);
117 ttl_manager.start_cleanup(move |expired_entries| {
118 let storage = Arc::clone(&storage_clone);
119
120 tokio::spawn(async move {
122 for entry in expired_entries {
123 let _ = storage.delete(&entry.key).await;
125 }
126 });
127
128 Ok(())
129 }).await?;
130
131 Ok(Self {
132 maplet,
133 storage,
134 ttl_manager,
135 config,
136 start_time: SystemTime::now(),
137 operation_count: Arc::new(RwLock::new(0)),
138 })
139 }
140
141 pub async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
143 if self.ttl_manager.is_expired(key).await? {
145 self.ttl_manager.remove_ttl(key).await?;
147 let _ = self.storage.delete(key).await;
148 return Ok(None);
149 }
150
151 let maplet_guard = self.maplet.read().await;
153 if !maplet_guard.contains(&key.to_string()).await {
154 drop(maplet_guard);
155 return Ok(None);
156 }
157 drop(maplet_guard);
158
159 let result = self.storage.get(key).await;
161
162 {
164 let mut count = self.operation_count.write().await;
165 *count += 1;
166 }
167
168 result
169 }
170
171 pub async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
173 {
175 let maplet_guard = self.maplet.write().await;
176 maplet_guard.insert(key.clone(), value.clone()).await?;
177 }
178
179 self.storage.set(key, value).await?;
181
182 {
184 let mut count = self.operation_count.write().await;
185 *count += 1;
186 }
187
188 Ok(())
189 }
190
191 pub async fn delete(&self, key: &str) -> MapletResult<bool> {
193 let result = self.storage.delete(key).await?;
195
196 {
202 let mut count = self.operation_count.write().await;
203 *count += 1;
204 }
205
206 Ok(result)
207 }
208
209 pub async fn exists(&self, key: &str) -> MapletResult<bool> {
211 let maplet_guard = self.maplet.read().await;
213 if !maplet_guard.contains(&key.to_string()).await {
214 drop(maplet_guard);
215 return Ok(false);
216 }
217 drop(maplet_guard);
218
219 let result = self.storage.exists(key).await?;
221
222 {
224 let mut count = self.operation_count.write().await;
225 *count += 1;
226 }
227
228 Ok(result)
229 }
230
231 pub async fn keys(&self) -> MapletResult<Vec<String>> {
233 let result = self.storage.keys().await?;
234
235 {
237 let mut count = self.operation_count.write().await;
238 *count += 1;
239 }
240
241 Ok(result)
242 }
243
244 pub async fn clear(&self) -> MapletResult<()> {
246 {
248 let mut maplet_guard = self.maplet.write().await;
249 *maplet_guard = Maplet::<String, Vec<u8>, ReplaceOperator>::with_config(self.config.maplet.clone())?;
251 }
252
253 self.storage.clear_database().await?;
255
256 {
258 let mut count = self.operation_count.write().await;
259 *count = 0;
260 }
261
262 Ok(())
263 }
264
265 pub async fn flush(&self) -> MapletResult<()> {
267 self.storage.flush().await?;
268 Ok(())
269 }
270
271 pub async fn close(&self) -> MapletResult<()> {
273 self.ttl_manager.stop_cleanup().await?;
275
276 self.storage.close().await?;
278 Ok(())
279 }
280
281 pub async fn stats(&self) -> MapletResult<EngineStats> {
283 let maplet_guard = self.maplet.read().await;
284 let maplet_stats = maplet_guard.stats().await;
285 drop(maplet_guard);
286
287 let storage_stats = self.storage.stats().await?;
288 let ttl_stats = self.ttl_manager.get_stats().await?;
289 let operation_count = *self.operation_count.read().await;
290
291 let uptime = self.start_time.elapsed()
292 .unwrap_or_default()
293 .as_secs();
294
295 Ok(EngineStats {
296 maplet_stats,
297 storage_stats,
298 ttl_stats,
299 uptime_seconds: uptime,
300 total_operations: operation_count,
301 })
302 }
303
304 pub async fn memory_usage(&self) -> MapletResult<u64> {
306 let storage_stats = self.storage.stats().await?;
307 Ok(storage_stats.memory_usage)
308 }
309
310 pub fn persistence_mode(&self) -> PersistenceMode {
312 self.config.persistence_mode.clone()
313 }
314
315 pub fn config(&self) -> &EngineConfig {
317 &self.config
318 }
319
320 pub async fn expire(&self, key: &str, ttl_seconds: u64) -> MapletResult<bool> {
322 if !self.exists(key).await? {
324 return Ok(false);
325 }
326
327 self.ttl_manager.set_ttl(key.to_string(), 0, ttl_seconds).await?;
329
330 {
332 let mut count = self.operation_count.write().await;
333 *count += 1;
334 }
335
336 Ok(true)
337 }
338
339 pub async fn ttl(&self, key: &str) -> MapletResult<Option<i64>> {
341 let result = self.ttl_manager.get_ttl(key).await?;
342
343 {
345 let mut count = self.operation_count.write().await;
346 *count += 1;
347 }
348
349 Ok(result)
350 }
351
352 pub async fn persist(&self, key: &str) -> MapletResult<bool> {
354 let had_ttl = self.ttl_manager.get_ttl(key).await?.is_some();
355 self.ttl_manager.remove_ttl(key).await?;
356
357 {
359 let mut count = self.operation_count.write().await;
360 *count += 1;
361 }
362
363 Ok(had_ttl)
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370 use tempfile::TempDir;
371
372 #[tokio::test]
373 async fn test_engine_creation() {
374 let config = EngineConfig::default();
375 let engine = Engine::new(config).await.unwrap();
376 assert_eq!(engine.persistence_mode(), PersistenceMode::Memory);
377 }
378
379 #[tokio::test]
380 async fn test_engine_basic_operations() {
381 let config = EngineConfig::default();
382 let engine = Engine::new(config).await.unwrap();
383
384 engine.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
386 let value = engine.get("key1").await.unwrap();
387 assert_eq!(value, Some(b"value1".to_vec()));
388
389 assert!(engine.exists("key1").await.unwrap());
391 assert!(!engine.exists("nonexistent").await.unwrap());
392
393 let deleted = engine.delete("key1").await.unwrap();
395 assert!(deleted);
396 assert!(!engine.exists("key1").await.unwrap());
397 }
398
399 #[tokio::test]
400 async fn test_engine_with_disk_storage() {
401 let temp_dir = TempDir::new().unwrap();
402 let config = EngineConfig {
403 persistence_mode: PersistenceMode::Disk,
404 data_dir: Some(temp_dir.path().to_string_lossy().to_string()),
405 ..Default::default()
406 };
407
408 let engine = Engine::new(config).await.unwrap();
409
410 engine.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
412 let value = engine.get("key1").await.unwrap();
413 assert_eq!(value, Some(b"value1".to_vec()));
414 }
415
416 #[tokio::test]
417 async fn test_engine_stats() {
418 let config = EngineConfig::default();
419 let engine = Engine::new(config).await.unwrap();
420
421 engine.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
423 engine.get("key1").await.unwrap();
424
425 let stats = engine.stats().await.unwrap();
426 assert!(stats.total_operations > 0);
427 assert!(stats.uptime_seconds >= 0);
428 }
429
430 #[tokio::test]
431 async fn test_engine_ttl_operations() {
432 let config = EngineConfig::default();
433 let engine = Engine::new(config).await.unwrap();
434
435 engine.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
437
438 let result = engine.expire("key1", 60).await.unwrap();
440 assert!(result);
441
442 let ttl = engine.ttl("key1").await.unwrap();
444 assert!(ttl.is_some());
445 assert!(ttl.unwrap() <= 60);
446
447 let had_ttl = engine.persist("key1").await.unwrap();
449 assert!(had_ttl);
450
451 let ttl = engine.ttl("key1").await.unwrap();
453 assert!(ttl.is_none());
454 }
455
456 #[tokio::test]
457 async fn test_engine_ttl_expiration() {
458 let config = EngineConfig::default();
459 let engine = Engine::new(config).await.unwrap();
460
461 engine.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
463 engine.expire("key1", 1).await.unwrap();
464
465 tokio::time::sleep(tokio::time::Duration::from_millis(1100)).await;
467
468 let value = engine.get("key1").await.unwrap();
470 assert!(value.is_none());
471 }
472}