Skip to main content

alien_bindings/providers/kv/
local.rs

1use crate::error::{ErrorData, Result};
2use crate::traits::{Binding, Kv, PutOptions, ScanResult};
3use alien_error::{
4    AlienError, Context as _, ContextError as _, IntoAlienError as _, IntoAlienErrorDirect,
5};
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::path::PathBuf;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::Mutex;
13
14/// Local disk-persisted KV implementation using sled embedded database
15///
16/// This provides a persistent, thread-safe, disk-based key-value store that implements
17/// all KV trait features including TTL, conditional puts, and prefix scanning.
18/// Perfect for local development and testing that needs data persistence across restarts.
19#[derive(Debug)]
20pub struct LocalKv {
21    db: Arc<Mutex<sled::Db>>,
22    data_dir: PathBuf,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26struct StoredValue {
27    value: Vec<u8>,
28    expires_at: Option<DateTime<Utc>>,
29}
30
31impl StoredValue {
32    fn new(value: Vec<u8>, ttl: Option<Duration>) -> Self {
33        let expires_at = ttl
34            .map(|duration| Utc::now() + chrono::Duration::from_std(duration).unwrap_or_default());
35
36        Self { value, expires_at }
37    }
38
39    fn is_expired(&self) -> bool {
40        if let Some(expires_at) = self.expires_at {
41            Utc::now() >= expires_at
42        } else {
43            false
44        }
45    }
46}
47
48impl LocalKv {
49    /// Create a new local KV store with the given data directory
50    pub async fn new(data_dir: PathBuf) -> Result<Self> {
51        tracing::debug!(data_dir = %data_dir.display(), "Opening LocalKv database");
52
53        // Ensure the data directory exists
54        if let Some(parent) = data_dir.parent() {
55            tokio::fs::create_dir_all(parent)
56                .await
57                .into_alien_error()
58                .context(ErrorData::LocalFilesystemError {
59                    path: parent.to_string_lossy().to_string(),
60                    operation: "create_dir_all".to_string(),
61                })?;
62        }
63
64        let db =
65            sled::open(&data_dir)
66                .into_alien_error()
67                .context(ErrorData::BindingSetupFailed {
68                    binding_type: "local KV".to_string(),
69                    reason: format!("Failed to open sled database at: {:?}", data_dir),
70                })?;
71
72        tracing::debug!(data_dir = %data_dir.display(), "LocalKv database opened successfully");
73
74        Ok(Self {
75            db: Arc::new(Mutex::new(db)),
76            data_dir,
77        })
78    }
79
80    /// Get the data directory path
81    pub fn data_dir(&self) -> &PathBuf {
82        &self.data_dir
83    }
84
85    /// Get the number of items currently stored (including expired items)
86    /// Useful for testing
87    pub async fn len(&self) -> Result<usize> {
88        let db = self.db.lock().await;
89        Ok(db.len())
90    }
91
92    /// Check if the store is empty (including expired items)
93    /// Useful for testing
94    pub async fn is_empty(&self) -> Result<bool> {
95        let db = self.db.lock().await;
96        Ok(db.is_empty())
97    }
98
99    /// Clear all data from the store
100    /// Useful for testing
101    pub async fn clear(&self) -> Result<()> {
102        let db = self.db.lock().await;
103        db.clear()
104            .into_alien_error()
105            .context(ErrorData::KvOperationFailed {
106                operation: "clear".to_string(),
107                key: "*".to_string(),
108                reason: "Failed to clear local KV store".to_string(),
109            })?;
110        Ok(())
111    }
112
113    /// Get all keys currently in the store (including expired ones)
114    /// Useful for testing and debugging
115    pub async fn keys(&self) -> Result<Vec<String>> {
116        let db = self.db.lock().await;
117        let mut keys = Vec::new();
118
119        for result in db.iter() {
120            let (key, _) = result
121                .into_alien_error()
122                .context(ErrorData::KvOperationFailed {
123                    operation: "scan keys".to_string(),
124                    key: "<unknown>".to_string(),
125                    reason: "Failed to iterate over keys".to_string(),
126                })?;
127
128            let key_str = String::from_utf8(key.to_vec()).into_alien_error().context(
129                ErrorData::KvOperationFailed {
130                    operation: "decode key".to_string(),
131                    key: "<invalid UTF-8>".to_string(),
132                    reason: "Invalid UTF-8 in stored key".to_string(),
133                },
134            )?;
135
136            keys.push(key_str);
137        }
138
139        Ok(keys)
140    }
141
142    /// Validate key constraints using global KV validation
143    fn validate_key(key: &str) -> Result<()> {
144        crate::providers::kv::validate_key(key)
145    }
146
147    /// Validate value constraints using global KV validation
148    fn validate_value(value: &[u8]) -> Result<()> {
149        crate::providers::kv::validate_value(value)
150    }
151
152    /// Serialize a stored value to bytes
153    fn serialize_value(stored_value: &StoredValue) -> Result<Vec<u8>> {
154        serde_json::to_vec(stored_value)
155            .into_alien_error()
156            .context(ErrorData::KvOperationFailed {
157                operation: "serialize value".to_string(),
158                key: "<unknown>".to_string(),
159                reason: "Failed to serialize value to JSON".to_string(),
160            })
161    }
162
163    /// Deserialize bytes to a stored value
164    fn deserialize_value(bytes: &[u8]) -> Result<StoredValue> {
165        serde_json::from_slice(bytes)
166            .into_alien_error()
167            .context(ErrorData::KvOperationFailed {
168                operation: "deserialize value".to_string(),
169                key: "<unknown>".to_string(),
170                reason: "Failed to deserialize value from JSON".to_string(),
171            })
172    }
173}
174
175impl Binding for LocalKv {}
176
177#[async_trait]
178impl Kv for LocalKv {
179    async fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
180        Self::validate_key(key)?;
181
182        let db = self.db.lock().await;
183
184        let value_bytes = match db.get(key.as_bytes()) {
185            Ok(Some(bytes)) => bytes,
186            Ok(None) => return Ok(None),
187            Err(e) => {
188                return Err(e.into_alien_error().context(ErrorData::KvOperationFailed {
189                    operation: "get".to_string(),
190                    key: key.to_string(),
191                    reason: "Failed to retrieve value from sled database".to_string(),
192                }));
193            }
194        };
195
196        let stored_value = Self::deserialize_value(&value_bytes)?;
197
198        if stored_value.is_expired() {
199            // Lazily remove expired items
200            let _ = db.remove(key.as_bytes());
201            Ok(None)
202        } else {
203            Ok(Some(stored_value.value))
204        }
205    }
206
207    async fn put(&self, key: &str, value: Vec<u8>, options: Option<PutOptions>) -> Result<bool> {
208        Self::validate_key(key)?;
209        Self::validate_value(&value)?;
210
211        let db = self.db.lock().await;
212        let options = options.unwrap_or_default();
213
214        // Handle conditional put (if_not_exists)
215        if options.if_not_exists {
216            if let Some(existing_bytes) =
217                db.get(key.as_bytes())
218                    .into_alien_error()
219                    .context(ErrorData::KvOperationFailed {
220                        operation: "conditional put check".to_string(),
221                        key: key.to_string(),
222                        reason: "Failed to check existing key".to_string(),
223                    })?
224            {
225                // Check if existing value is expired
226                if let Ok(existing_stored) = Self::deserialize_value(&existing_bytes) {
227                    if !existing_stored.is_expired() {
228                        return Ok(false); // Key exists and is not expired
229                    }
230                }
231                // If we can't deserialize or it's expired, we can overwrite
232            }
233        }
234
235        let stored_value = StoredValue::new(value, options.ttl);
236        let serialized = Self::serialize_value(&stored_value)?;
237
238        db.insert(key.as_bytes(), serialized)
239            .into_alien_error()
240            .context(ErrorData::KvOperationFailed {
241                operation: "put".to_string(),
242                key: key.to_string(),
243                reason: "Failed to insert value into sled database".to_string(),
244            })?;
245
246        // Ensure data is persisted to disk without blocking the Tokio thread
247        db.flush_async()
248            .await
249            .into_alien_error()
250            .context(ErrorData::KvOperationFailed {
251                operation: "flush".to_string(),
252                key: key.to_string(),
253                reason: "Failed to flush data to disk".to_string(),
254            })?;
255
256        tracing::info!(key = %key, data_dir = %self.data_dir.display(), "LocalKv::put completed successfully and flushed");
257
258        Ok(true)
259    }
260
261    async fn delete(&self, key: &str) -> Result<()> {
262        Self::validate_key(key)?;
263
264        let db = self.db.lock().await;
265        db.remove(key.as_bytes())
266            .into_alien_error()
267            .context(ErrorData::KvOperationFailed {
268                operation: "delete".to_string(),
269                key: key.to_string(),
270                reason: "Failed to remove key from sled database".to_string(),
271            })?;
272
273        // Ensure deletion is persisted to disk without blocking the Tokio thread
274        db.flush_async()
275            .await
276            .into_alien_error()
277            .context(ErrorData::KvOperationFailed {
278                operation: "flush".to_string(),
279                key: key.to_string(),
280                reason: "Failed to flush deletion to disk".to_string(),
281            })?;
282
283        Ok(())
284    }
285
286    async fn exists(&self, key: &str) -> Result<bool> {
287        Self::validate_key(key)?;
288
289        let db = self.db.lock().await;
290
291        match db.get(key.as_bytes()) {
292            Ok(Some(bytes)) => {
293                let stored_value = Self::deserialize_value(&bytes)?;
294                if stored_value.is_expired() {
295                    // Lazily remove expired items
296                    let _ = db.remove(key.as_bytes());
297                    Ok(false)
298                } else {
299                    Ok(true)
300                }
301            }
302            Ok(None) => Ok(false),
303            Err(e) => Err(e.into_alien_error().context(ErrorData::KvOperationFailed {
304                operation: "exists".to_string(),
305                key: key.to_string(),
306                reason: "Failed to check key existence in sled database".to_string(),
307            })),
308        }
309    }
310
311    async fn scan_prefix(
312        &self,
313        prefix: &str,
314        limit: Option<usize>,
315        cursor: Option<String>,
316    ) -> Result<ScanResult> {
317        Self::validate_key(prefix)?;
318
319        let db = self.db.lock().await;
320
321        // Parse cursor if provided (simple offset-based pagination for local)
322        let start_offset = if let Some(cursor_str) = cursor {
323            cursor_str.parse::<usize>().map_err(|_| {
324                AlienError::new(ErrorData::InvalidInput {
325                    operation_context: "KV scan cursor parsing".to_string(),
326                    details: format!("Invalid cursor format: {}", cursor_str),
327                    field_name: Some("cursor".to_string()),
328                })
329            })?
330        } else {
331            0
332        };
333
334        // Collect matching, non-expired keys
335        let mut matching_items: Vec<(String, Vec<u8>)> = Vec::new();
336
337        for result in db.scan_prefix(prefix.as_bytes()) {
338            let (key_bytes, value_bytes) =
339                result
340                    .into_alien_error()
341                    .context(ErrorData::KvOperationFailed {
342                        operation: "scan_prefix".to_string(),
343                        key: prefix.to_string(),
344                        reason: "Failed to scan prefix in sled database".to_string(),
345                    })?;
346
347            let key = String::from_utf8(key_bytes.to_vec())
348                .into_alien_error()
349                .context(ErrorData::KvOperationFailed {
350                    operation: "decode key".to_string(),
351                    key: prefix.to_string(),
352                    reason: "Invalid UTF-8 in stored key during scan".to_string(),
353                })?;
354
355            if let Ok(stored_value) = Self::deserialize_value(&value_bytes) {
356                if !stored_value.is_expired() {
357                    matching_items.push((key, stored_value.value));
358                }
359            }
360        }
361
362        // Sort for deterministic behavior
363        matching_items.sort_by(|a, b| a.0.cmp(&b.0));
364
365        // Apply pagination
366        let total_items = matching_items.len();
367        let end_offset = start_offset + limit.unwrap_or(total_items);
368
369        let items = matching_items
370            .into_iter()
371            .skip(start_offset)
372            .take(limit.unwrap_or(usize::MAX))
373            .collect::<Vec<_>>();
374
375        // Generate next cursor if there are more items
376        let next_cursor = if end_offset < total_items {
377            Some(end_offset.to_string())
378        } else {
379            None
380        };
381
382        Ok(ScanResult { items, next_cursor })
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389    use std::time::Duration;
390    use tempfile::TempDir;
391    use tokio::time;
392
393    async fn create_test_kv() -> (LocalKv, TempDir) {
394        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
395        let kv = LocalKv::new(temp_dir.path().join("kv.db"))
396            .await
397            .expect("Failed to create LocalKv");
398        (kv, temp_dir)
399    }
400
401    #[tokio::test]
402    async fn test_basic_operations() {
403        let (kv, _temp_dir) = create_test_kv().await;
404
405        // Test put and get
406        assert!(kv
407            .put("test_key", b"test_value".to_vec(), None)
408            .await
409            .unwrap());
410        let value = kv.get("test_key").await.unwrap();
411        assert_eq!(value, Some(b"test_value".to_vec()));
412
413        // Test exists
414        assert!(kv.exists("test_key").await.unwrap());
415        assert!(!kv.exists("nonexistent").await.unwrap());
416
417        // Test delete
418        kv.delete("test_key").await.unwrap();
419        assert!(!kv.exists("test_key").await.unwrap());
420        assert_eq!(kv.get("test_key").await.unwrap(), None);
421    }
422
423    #[tokio::test]
424    async fn test_conditional_put() {
425        let (kv, _temp_dir) = create_test_kv().await;
426
427        // First put should succeed
428        let options = Some(PutOptions {
429            ttl: None,
430            if_not_exists: true,
431        });
432        assert!(kv
433            .put("key", b"value1".to_vec(), options.clone())
434            .await
435            .unwrap());
436
437        // Second put should fail due to if_not_exists
438        assert!(!kv.put("key", b"value2".to_vec(), options).await.unwrap());
439
440        // Value should still be the original
441        assert_eq!(kv.get("key").await.unwrap(), Some(b"value1".to_vec()));
442
443        // Regular put should succeed
444        assert!(kv.put("key", b"value3".to_vec(), None).await.unwrap());
445        assert_eq!(kv.get("key").await.unwrap(), Some(b"value3".to_vec()));
446    }
447
448    #[tokio::test]
449    async fn test_ttl_expiration() {
450        let (kv, _temp_dir) = create_test_kv().await;
451
452        let options = Some(PutOptions {
453            ttl: Some(Duration::from_millis(500)),
454            if_not_exists: false,
455        });
456
457        kv.put("expiring_key", b"value".to_vec(), options)
458            .await
459            .unwrap();
460
461        // Should exist immediately after put completes
462        assert!(kv.exists("expiring_key").await.unwrap());
463        assert_eq!(
464            kv.get("expiring_key").await.unwrap(),
465            Some(b"value".to_vec())
466        );
467
468        // Wait for expiration
469        time::sleep(Duration::from_millis(750)).await;
470
471        // Should be expired now
472        assert!(!kv.exists("expiring_key").await.unwrap());
473        assert_eq!(kv.get("expiring_key").await.unwrap(), None);
474    }
475
476    #[tokio::test]
477    async fn test_prefix_scanning() {
478        let (kv, _temp_dir) = create_test_kv().await;
479
480        // Insert test data
481        kv.put("prefix:key1", b"value1".to_vec(), None)
482            .await
483            .unwrap();
484        kv.put("prefix:key2", b"value2".to_vec(), None)
485            .await
486            .unwrap();
487        kv.put("prefix:key3", b"value3".to_vec(), None)
488            .await
489            .unwrap();
490        kv.put("other:key", b"other".to_vec(), None).await.unwrap();
491
492        // Scan with prefix
493        let result = kv.scan_prefix("prefix:", None, None).await.unwrap();
494        assert_eq!(result.items.len(), 3);
495        assert!(result.next_cursor.is_none());
496
497        // Check items are sorted
498        assert_eq!(result.items[0].0, "prefix:key1");
499        assert_eq!(result.items[1].0, "prefix:key2");
500        assert_eq!(result.items[2].0, "prefix:key3");
501
502        // Test with limit
503        let result = kv.scan_prefix("prefix:", Some(2), None).await.unwrap();
504        assert_eq!(result.items.len(), 2);
505        assert!(result.next_cursor.is_some());
506
507        // Test pagination
508        let cursor = result.next_cursor.unwrap();
509        let result = kv
510            .scan_prefix("prefix:", Some(2), Some(cursor))
511            .await
512            .unwrap();
513        assert_eq!(result.items.len(), 1);
514        assert_eq!(result.items[0].0, "prefix:key3");
515        assert!(result.next_cursor.is_none());
516    }
517
518    #[tokio::test]
519    async fn test_persistence_across_reopens() {
520        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
521        let db_path = temp_dir.path().join("kv.db");
522
523        // Create KV, add data, and drop it
524        {
525            let kv = LocalKv::new(db_path.clone())
526                .await
527                .expect("Failed to create LocalKv");
528            kv.put("persistent_key", b"persistent_value".to_vec(), None)
529                .await
530                .unwrap();
531        }
532
533        // Reopen and verify data persists
534        {
535            let kv = LocalKv::new(db_path)
536                .await
537                .expect("Failed to reopen LocalKv");
538            let value = kv.get("persistent_key").await.unwrap();
539            assert_eq!(value, Some(b"persistent_value".to_vec()));
540        }
541    }
542
543    #[tokio::test]
544    async fn test_key_validation() {
545        let (kv, _temp_dir) = create_test_kv().await;
546
547        // Empty key should fail
548        assert!(kv.put("", b"value".to_vec(), None).await.is_err());
549        assert!(kv.get("").await.is_err());
550
551        // Key too long should fail
552        let long_key = "a".repeat(513);
553        assert!(kv.put(&long_key, b"value".to_vec(), None).await.is_err());
554
555        // Invalid characters should fail
556        assert!(kv
557            .put("key with spaces", b"value".to_vec(), None)
558            .await
559            .is_err());
560        assert!(kv
561            .put("key\nwith\nnewlines", b"value".to_vec(), None)
562            .await
563            .is_err());
564        assert!(kv
565            .put("key/with/slashes", b"value".to_vec(), None)
566            .await
567            .is_err());
568
569        // Valid keys should succeed
570        assert!(kv
571            .put("valid_key-123", b"value".to_vec(), None)
572            .await
573            .is_ok());
574        assert!(kv
575            .put("domain.com:8080", b"value".to_vec(), None)
576            .await
577            .is_ok());
578    }
579
580    #[tokio::test]
581    async fn test_value_validation() {
582        let (kv, _temp_dir) = create_test_kv().await;
583
584        // Value too large should fail
585        let large_value = vec![0u8; 24_577]; // Just over 24 KiB
586        assert!(kv.put("key", large_value, None).await.is_err());
587
588        // Maximum size value should succeed
589        let max_value = vec![0u8; 24_576]; // Exactly 24 KiB
590        assert!(kv.put("key", max_value, None).await.is_ok());
591    }
592
593    #[tokio::test]
594    async fn test_utility_methods() {
595        let (kv, _temp_dir) = create_test_kv().await;
596
597        // Initially empty
598        assert!(kv.is_empty().await.unwrap());
599        assert_eq!(kv.len().await.unwrap(), 0);
600        assert_eq!(kv.keys().await.unwrap(), Vec::<String>::new());
601
602        // Add some data
603        kv.put("key1", b"value1".to_vec(), None).await.unwrap();
604        kv.put("key2", b"value2".to_vec(), None).await.unwrap();
605
606        assert!(!kv.is_empty().await.unwrap());
607        assert_eq!(kv.len().await.unwrap(), 2);
608
609        let mut keys = kv.keys().await.unwrap();
610        keys.sort();
611        assert_eq!(keys, vec!["key1", "key2"]);
612
613        // Clear
614        kv.clear().await.unwrap();
615        assert!(kv.is_empty().await.unwrap());
616        assert_eq!(kv.len().await.unwrap(), 0);
617    }
618}