1use async_trait::async_trait;
4use bytes::Bytes;
5use futures::stream;
6use sled::Db;
7
8use crate::DatastoreConfig;
9use helia_interface::*;
10
11pub struct SledDatastore {
13 db: Db,
14}
15
16impl SledDatastore {
17 pub fn new(config: DatastoreConfig) -> Result<Self, HeliaError> {
18 let db = if let Some(path) = config.path {
19 sled::open(path)
20 .map_err(|e| HeliaError::datastore(format!("Failed to open datastore: {}", e)))?
21 } else {
22 sled::Config::new().temporary(true).open().map_err(|e| {
23 HeliaError::datastore(format!("Failed to create temporary datastore: {}", e))
24 })?
25 };
26
27 Ok(Self { db })
28 }
29}
30
31#[async_trait]
32impl Datastore for SledDatastore {
33 async fn get(&self, key: &[u8]) -> Result<Option<Bytes>, HeliaError> {
34 match self.db.get(key) {
35 Ok(Some(data)) => Ok(Some(Bytes::from(data.to_vec()))),
36 Ok(None) => Ok(None),
37 Err(e) => Err(HeliaError::datastore(format!("Datastore get error: {}", e))),
38 }
39 }
40
41 async fn put(&self, key: &[u8], value: Bytes) -> Result<(), HeliaError> {
42 self.db
43 .insert(key, value.as_ref())
44 .map_err(|e| HeliaError::datastore(format!("Datastore put error: {}", e)))?;
45 Ok(())
46 }
47
48 async fn delete(&self, key: &[u8]) -> Result<(), HeliaError> {
49 self.db
50 .remove(key)
51 .map_err(|e| HeliaError::datastore(format!("Datastore delete error: {}", e)))?;
52 Ok(())
53 }
54
55 async fn has(&self, key: &[u8]) -> Result<bool, HeliaError> {
56 match self.db.contains_key(key) {
57 Ok(exists) => Ok(exists),
58 Err(e) => Err(HeliaError::datastore(format!("Datastore has error: {}", e))),
59 }
60 }
61
62 async fn query(&self, prefix: Option<&[u8]>) -> Result<AwaitIterable<Bytes>, HeliaError> {
63 let mut results = Vec::new();
64
65 if let Some(prefix) = prefix {
66 for item in self.db.scan_prefix(prefix) {
68 match item {
69 Ok((_key, value)) => {
70 results.push(Bytes::from(value.to_vec()));
71 }
72 Err(e) => {
73 return Err(HeliaError::datastore(format!("Query error: {}", e)));
74 }
75 }
76 }
77 } else {
78 for item in self.db.iter() {
80 match item {
81 Ok((_key, value)) => {
82 results.push(Bytes::from(value.to_vec()));
83 }
84 Err(e) => {
85 return Err(HeliaError::datastore(format!("Query error: {}", e)));
86 }
87 }
88 }
89 }
90
91 Ok(Box::pin(stream::iter(results)))
92 }
93}