1use crate::error::{ErrorData, Result};
2use crate::traits::{Binding, Kv, PutOptions, ScanResult};
3use alien_error::{
4 AlienError, Context as _, ContextError as _, IntoAlienError as _, IntoAlienErrorDirect,
5};
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::path::PathBuf;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::Mutex;
13
14#[derive(Debug)]
20pub struct LocalKv {
21 db: Arc<Mutex<sled::Db>>,
22 data_dir: PathBuf,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26struct StoredValue {
27 value: Vec<u8>,
28 expires_at: Option<DateTime<Utc>>,
29}
30
31impl StoredValue {
32 fn new(value: Vec<u8>, ttl: Option<Duration>) -> Self {
33 let expires_at = ttl
34 .map(|duration| Utc::now() + chrono::Duration::from_std(duration).unwrap_or_default());
35
36 Self { value, expires_at }
37 }
38
39 fn is_expired(&self) -> bool {
40 if let Some(expires_at) = self.expires_at {
41 Utc::now() >= expires_at
42 } else {
43 false
44 }
45 }
46}
47
48impl LocalKv {
49 pub async fn new(data_dir: PathBuf) -> Result<Self> {
51 tracing::debug!(data_dir = %data_dir.display(), "Opening LocalKv database");
52
53 if let Some(parent) = data_dir.parent() {
55 tokio::fs::create_dir_all(parent)
56 .await
57 .into_alien_error()
58 .context(ErrorData::LocalFilesystemError {
59 path: parent.to_string_lossy().to_string(),
60 operation: "create_dir_all".to_string(),
61 })?;
62 }
63
64 let db =
65 sled::open(&data_dir)
66 .into_alien_error()
67 .context(ErrorData::BindingSetupFailed {
68 binding_type: "local KV".to_string(),
69 reason: format!("Failed to open sled database at: {:?}", data_dir),
70 })?;
71
72 tracing::debug!(data_dir = %data_dir.display(), "LocalKv database opened successfully");
73
74 Ok(Self {
75 db: Arc::new(Mutex::new(db)),
76 data_dir,
77 })
78 }
79
80 pub fn data_dir(&self) -> &PathBuf {
82 &self.data_dir
83 }
84
85 pub async fn len(&self) -> Result<usize> {
88 let db = self.db.lock().await;
89 Ok(db.len())
90 }
91
92 pub async fn is_empty(&self) -> Result<bool> {
95 let db = self.db.lock().await;
96 Ok(db.is_empty())
97 }
98
99 pub async fn clear(&self) -> Result<()> {
102 let db = self.db.lock().await;
103 db.clear()
104 .into_alien_error()
105 .context(ErrorData::KvOperationFailed {
106 operation: "clear".to_string(),
107 key: "*".to_string(),
108 reason: "Failed to clear local KV store".to_string(),
109 })?;
110 Ok(())
111 }
112
113 pub async fn keys(&self) -> Result<Vec<String>> {
116 let db = self.db.lock().await;
117 let mut keys = Vec::new();
118
119 for result in db.iter() {
120 let (key, _) = result
121 .into_alien_error()
122 .context(ErrorData::KvOperationFailed {
123 operation: "scan keys".to_string(),
124 key: "<unknown>".to_string(),
125 reason: "Failed to iterate over keys".to_string(),
126 })?;
127
128 let key_str = String::from_utf8(key.to_vec()).into_alien_error().context(
129 ErrorData::KvOperationFailed {
130 operation: "decode key".to_string(),
131 key: "<invalid UTF-8>".to_string(),
132 reason: "Invalid UTF-8 in stored key".to_string(),
133 },
134 )?;
135
136 keys.push(key_str);
137 }
138
139 Ok(keys)
140 }
141
142 fn validate_key(key: &str) -> Result<()> {
144 crate::providers::kv::validate_key(key)
145 }
146
147 fn validate_value(value: &[u8]) -> Result<()> {
149 crate::providers::kv::validate_value(value)
150 }
151
152 fn serialize_value(stored_value: &StoredValue) -> Result<Vec<u8>> {
154 serde_json::to_vec(stored_value)
155 .into_alien_error()
156 .context(ErrorData::KvOperationFailed {
157 operation: "serialize value".to_string(),
158 key: "<unknown>".to_string(),
159 reason: "Failed to serialize value to JSON".to_string(),
160 })
161 }
162
163 fn deserialize_value(bytes: &[u8]) -> Result<StoredValue> {
165 serde_json::from_slice(bytes)
166 .into_alien_error()
167 .context(ErrorData::KvOperationFailed {
168 operation: "deserialize value".to_string(),
169 key: "<unknown>".to_string(),
170 reason: "Failed to deserialize value from JSON".to_string(),
171 })
172 }
173}
174
175impl Binding for LocalKv {}
176
177#[async_trait]
178impl Kv for LocalKv {
179 async fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
180 Self::validate_key(key)?;
181
182 let db = self.db.lock().await;
183
184 let value_bytes = match db.get(key.as_bytes()) {
185 Ok(Some(bytes)) => bytes,
186 Ok(None) => return Ok(None),
187 Err(e) => {
188 return Err(e.into_alien_error().context(ErrorData::KvOperationFailed {
189 operation: "get".to_string(),
190 key: key.to_string(),
191 reason: "Failed to retrieve value from sled database".to_string(),
192 }));
193 }
194 };
195
196 let stored_value = Self::deserialize_value(&value_bytes)?;
197
198 if stored_value.is_expired() {
199 let _ = db.remove(key.as_bytes());
201 Ok(None)
202 } else {
203 Ok(Some(stored_value.value))
204 }
205 }
206
207 async fn put(&self, key: &str, value: Vec<u8>, options: Option<PutOptions>) -> Result<bool> {
208 Self::validate_key(key)?;
209 Self::validate_value(&value)?;
210
211 let db = self.db.lock().await;
212 let options = options.unwrap_or_default();
213
214 if options.if_not_exists {
216 if let Some(existing_bytes) =
217 db.get(key.as_bytes())
218 .into_alien_error()
219 .context(ErrorData::KvOperationFailed {
220 operation: "conditional put check".to_string(),
221 key: key.to_string(),
222 reason: "Failed to check existing key".to_string(),
223 })?
224 {
225 if let Ok(existing_stored) = Self::deserialize_value(&existing_bytes) {
227 if !existing_stored.is_expired() {
228 return Ok(false); }
230 }
231 }
233 }
234
235 let stored_value = StoredValue::new(value, options.ttl);
236 let serialized = Self::serialize_value(&stored_value)?;
237
238 db.insert(key.as_bytes(), serialized)
239 .into_alien_error()
240 .context(ErrorData::KvOperationFailed {
241 operation: "put".to_string(),
242 key: key.to_string(),
243 reason: "Failed to insert value into sled database".to_string(),
244 })?;
245
246 db.flush_async()
248 .await
249 .into_alien_error()
250 .context(ErrorData::KvOperationFailed {
251 operation: "flush".to_string(),
252 key: key.to_string(),
253 reason: "Failed to flush data to disk".to_string(),
254 })?;
255
256 tracing::info!(key = %key, data_dir = %self.data_dir.display(), "LocalKv::put completed successfully and flushed");
257
258 Ok(true)
259 }
260
261 async fn delete(&self, key: &str) -> Result<()> {
262 Self::validate_key(key)?;
263
264 let db = self.db.lock().await;
265 db.remove(key.as_bytes())
266 .into_alien_error()
267 .context(ErrorData::KvOperationFailed {
268 operation: "delete".to_string(),
269 key: key.to_string(),
270 reason: "Failed to remove key from sled database".to_string(),
271 })?;
272
273 db.flush_async()
275 .await
276 .into_alien_error()
277 .context(ErrorData::KvOperationFailed {
278 operation: "flush".to_string(),
279 key: key.to_string(),
280 reason: "Failed to flush deletion to disk".to_string(),
281 })?;
282
283 Ok(())
284 }
285
286 async fn exists(&self, key: &str) -> Result<bool> {
287 Self::validate_key(key)?;
288
289 let db = self.db.lock().await;
290
291 match db.get(key.as_bytes()) {
292 Ok(Some(bytes)) => {
293 let stored_value = Self::deserialize_value(&bytes)?;
294 if stored_value.is_expired() {
295 let _ = db.remove(key.as_bytes());
297 Ok(false)
298 } else {
299 Ok(true)
300 }
301 }
302 Ok(None) => Ok(false),
303 Err(e) => Err(e.into_alien_error().context(ErrorData::KvOperationFailed {
304 operation: "exists".to_string(),
305 key: key.to_string(),
306 reason: "Failed to check key existence in sled database".to_string(),
307 })),
308 }
309 }
310
311 async fn scan_prefix(
312 &self,
313 prefix: &str,
314 limit: Option<usize>,
315 cursor: Option<String>,
316 ) -> Result<ScanResult> {
317 Self::validate_key(prefix)?;
318
319 let db = self.db.lock().await;
320
321 let start_offset = if let Some(cursor_str) = cursor {
323 cursor_str.parse::<usize>().map_err(|_| {
324 AlienError::new(ErrorData::InvalidInput {
325 operation_context: "KV scan cursor parsing".to_string(),
326 details: format!("Invalid cursor format: {}", cursor_str),
327 field_name: Some("cursor".to_string()),
328 })
329 })?
330 } else {
331 0
332 };
333
334 let mut matching_items: Vec<(String, Vec<u8>)> = Vec::new();
336
337 for result in db.scan_prefix(prefix.as_bytes()) {
338 let (key_bytes, value_bytes) =
339 result
340 .into_alien_error()
341 .context(ErrorData::KvOperationFailed {
342 operation: "scan_prefix".to_string(),
343 key: prefix.to_string(),
344 reason: "Failed to scan prefix in sled database".to_string(),
345 })?;
346
347 let key = String::from_utf8(key_bytes.to_vec())
348 .into_alien_error()
349 .context(ErrorData::KvOperationFailed {
350 operation: "decode key".to_string(),
351 key: prefix.to_string(),
352 reason: "Invalid UTF-8 in stored key during scan".to_string(),
353 })?;
354
355 if let Ok(stored_value) = Self::deserialize_value(&value_bytes) {
356 if !stored_value.is_expired() {
357 matching_items.push((key, stored_value.value));
358 }
359 }
360 }
361
362 matching_items.sort_by(|a, b| a.0.cmp(&b.0));
364
365 let total_items = matching_items.len();
367 let end_offset = start_offset + limit.unwrap_or(total_items);
368
369 let items = matching_items
370 .into_iter()
371 .skip(start_offset)
372 .take(limit.unwrap_or(usize::MAX))
373 .collect::<Vec<_>>();
374
375 let next_cursor = if end_offset < total_items {
377 Some(end_offset.to_string())
378 } else {
379 None
380 };
381
382 Ok(ScanResult { items, next_cursor })
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389 use std::time::Duration;
390 use tempfile::TempDir;
391 use tokio::time;
392
393 async fn create_test_kv() -> (LocalKv, TempDir) {
394 let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
395 let kv = LocalKv::new(temp_dir.path().join("kv.db"))
396 .await
397 .expect("Failed to create LocalKv");
398 (kv, temp_dir)
399 }
400
401 #[tokio::test]
402 async fn test_basic_operations() {
403 let (kv, _temp_dir) = create_test_kv().await;
404
405 assert!(kv
407 .put("test_key", b"test_value".to_vec(), None)
408 .await
409 .unwrap());
410 let value = kv.get("test_key").await.unwrap();
411 assert_eq!(value, Some(b"test_value".to_vec()));
412
413 assert!(kv.exists("test_key").await.unwrap());
415 assert!(!kv.exists("nonexistent").await.unwrap());
416
417 kv.delete("test_key").await.unwrap();
419 assert!(!kv.exists("test_key").await.unwrap());
420 assert_eq!(kv.get("test_key").await.unwrap(), None);
421 }
422
423 #[tokio::test]
424 async fn test_conditional_put() {
425 let (kv, _temp_dir) = create_test_kv().await;
426
427 let options = Some(PutOptions {
429 ttl: None,
430 if_not_exists: true,
431 });
432 assert!(kv
433 .put("key", b"value1".to_vec(), options.clone())
434 .await
435 .unwrap());
436
437 assert!(!kv.put("key", b"value2".to_vec(), options).await.unwrap());
439
440 assert_eq!(kv.get("key").await.unwrap(), Some(b"value1".to_vec()));
442
443 assert!(kv.put("key", b"value3".to_vec(), None).await.unwrap());
445 assert_eq!(kv.get("key").await.unwrap(), Some(b"value3".to_vec()));
446 }
447
448 #[tokio::test]
449 async fn test_ttl_expiration() {
450 let (kv, _temp_dir) = create_test_kv().await;
451
452 let options = Some(PutOptions {
453 ttl: Some(Duration::from_millis(500)),
454 if_not_exists: false,
455 });
456
457 kv.put("expiring_key", b"value".to_vec(), options)
458 .await
459 .unwrap();
460
461 assert!(kv.exists("expiring_key").await.unwrap());
463 assert_eq!(
464 kv.get("expiring_key").await.unwrap(),
465 Some(b"value".to_vec())
466 );
467
468 time::sleep(Duration::from_millis(750)).await;
470
471 assert!(!kv.exists("expiring_key").await.unwrap());
473 assert_eq!(kv.get("expiring_key").await.unwrap(), None);
474 }
475
476 #[tokio::test]
477 async fn test_prefix_scanning() {
478 let (kv, _temp_dir) = create_test_kv().await;
479
480 kv.put("prefix:key1", b"value1".to_vec(), None)
482 .await
483 .unwrap();
484 kv.put("prefix:key2", b"value2".to_vec(), None)
485 .await
486 .unwrap();
487 kv.put("prefix:key3", b"value3".to_vec(), None)
488 .await
489 .unwrap();
490 kv.put("other:key", b"other".to_vec(), None).await.unwrap();
491
492 let result = kv.scan_prefix("prefix:", None, None).await.unwrap();
494 assert_eq!(result.items.len(), 3);
495 assert!(result.next_cursor.is_none());
496
497 assert_eq!(result.items[0].0, "prefix:key1");
499 assert_eq!(result.items[1].0, "prefix:key2");
500 assert_eq!(result.items[2].0, "prefix:key3");
501
502 let result = kv.scan_prefix("prefix:", Some(2), None).await.unwrap();
504 assert_eq!(result.items.len(), 2);
505 assert!(result.next_cursor.is_some());
506
507 let cursor = result.next_cursor.unwrap();
509 let result = kv
510 .scan_prefix("prefix:", Some(2), Some(cursor))
511 .await
512 .unwrap();
513 assert_eq!(result.items.len(), 1);
514 assert_eq!(result.items[0].0, "prefix:key3");
515 assert!(result.next_cursor.is_none());
516 }
517
518 #[tokio::test]
519 async fn test_persistence_across_reopens() {
520 let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
521 let db_path = temp_dir.path().join("kv.db");
522
523 {
525 let kv = LocalKv::new(db_path.clone())
526 .await
527 .expect("Failed to create LocalKv");
528 kv.put("persistent_key", b"persistent_value".to_vec(), None)
529 .await
530 .unwrap();
531 }
532
533 {
535 let kv = LocalKv::new(db_path)
536 .await
537 .expect("Failed to reopen LocalKv");
538 let value = kv.get("persistent_key").await.unwrap();
539 assert_eq!(value, Some(b"persistent_value".to_vec()));
540 }
541 }
542
543 #[tokio::test]
544 async fn test_key_validation() {
545 let (kv, _temp_dir) = create_test_kv().await;
546
547 assert!(kv.put("", b"value".to_vec(), None).await.is_err());
549 assert!(kv.get("").await.is_err());
550
551 let long_key = "a".repeat(513);
553 assert!(kv.put(&long_key, b"value".to_vec(), None).await.is_err());
554
555 assert!(kv
557 .put("key with spaces", b"value".to_vec(), None)
558 .await
559 .is_err());
560 assert!(kv
561 .put("key\nwith\nnewlines", b"value".to_vec(), None)
562 .await
563 .is_err());
564 assert!(kv
565 .put("key/with/slashes", b"value".to_vec(), None)
566 .await
567 .is_err());
568
569 assert!(kv
571 .put("valid_key-123", b"value".to_vec(), None)
572 .await
573 .is_ok());
574 assert!(kv
575 .put("domain.com:8080", b"value".to_vec(), None)
576 .await
577 .is_ok());
578 }
579
580 #[tokio::test]
581 async fn test_value_validation() {
582 let (kv, _temp_dir) = create_test_kv().await;
583
584 let large_value = vec![0u8; 24_577]; assert!(kv.put("key", large_value, None).await.is_err());
587
588 let max_value = vec![0u8; 24_576]; assert!(kv.put("key", max_value, None).await.is_ok());
591 }
592
593 #[tokio::test]
594 async fn test_utility_methods() {
595 let (kv, _temp_dir) = create_test_kv().await;
596
597 assert!(kv.is_empty().await.unwrap());
599 assert_eq!(kv.len().await.unwrap(), 0);
600 assert_eq!(kv.keys().await.unwrap(), Vec::<String>::new());
601
602 kv.put("key1", b"value1".to_vec(), None).await.unwrap();
604 kv.put("key2", b"value2".to_vec(), None).await.unwrap();
605
606 assert!(!kv.is_empty().await.unwrap());
607 assert_eq!(kv.len().await.unwrap(), 2);
608
609 let mut keys = kv.keys().await.unwrap();
610 keys.sort();
611 assert_eq!(keys, vec!["key1", "key2"]);
612
613 kv.clear().await.unwrap();
615 assert!(kv.is_empty().await.unwrap());
616 assert_eq!(kv.len().await.unwrap(), 0);
617 }
618}