helia_utils/
datastore.rs

1//! Datastore implementations
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use futures::stream;
6use sled::Db;
7
8use crate::DatastoreConfig;
9use helia_interface::*;
10
11/// Sled-based datastore implementation
12pub struct SledDatastore {
13    db: Db,
14}
15
16impl SledDatastore {
17    pub fn new(config: DatastoreConfig) -> Result<Self, HeliaError> {
18        let db = if let Some(path) = config.path {
19            sled::open(path)
20                .map_err(|e| HeliaError::datastore(format!("Failed to open datastore: {}", e)))?
21        } else {
22            sled::Config::new().temporary(true).open().map_err(|e| {
23                HeliaError::datastore(format!("Failed to create temporary datastore: {}", e))
24            })?
25        };
26
27        Ok(Self { db })
28    }
29}
30
31#[async_trait]
32impl Datastore for SledDatastore {
33    async fn get(&self, key: &[u8]) -> Result<Option<Bytes>, HeliaError> {
34        match self.db.get(key) {
35            Ok(Some(data)) => Ok(Some(Bytes::from(data.to_vec()))),
36            Ok(None) => Ok(None),
37            Err(e) => Err(HeliaError::datastore(format!("Datastore get error: {}", e))),
38        }
39    }
40
41    async fn put(&self, key: &[u8], value: Bytes) -> Result<(), HeliaError> {
42        self.db
43            .insert(key, value.as_ref())
44            .map_err(|e| HeliaError::datastore(format!("Datastore put error: {}", e)))?;
45        Ok(())
46    }
47
48    async fn delete(&self, key: &[u8]) -> Result<(), HeliaError> {
49        self.db
50            .remove(key)
51            .map_err(|e| HeliaError::datastore(format!("Datastore delete error: {}", e)))?;
52        Ok(())
53    }
54
55    async fn has(&self, key: &[u8]) -> Result<bool, HeliaError> {
56        match self.db.contains_key(key) {
57            Ok(exists) => Ok(exists),
58            Err(e) => Err(HeliaError::datastore(format!("Datastore has error: {}", e))),
59        }
60    }
61
62    async fn query(&self, prefix: Option<&[u8]>) -> Result<AwaitIterable<Bytes>, HeliaError> {
63        let mut results = Vec::new();
64
65        if let Some(prefix) = prefix {
66            // Iterate through keys with the given prefix
67            for item in self.db.scan_prefix(prefix) {
68                match item {
69                    Ok((_key, value)) => {
70                        results.push(Bytes::from(value.to_vec()));
71                    }
72                    Err(e) => {
73                        return Err(HeliaError::datastore(format!("Query error: {}", e)));
74                    }
75                }
76            }
77        } else {
78            // Iterate through all keys
79            for item in self.db.iter() {
80                match item {
81                    Ok((_key, value)) => {
82                        results.push(Bytes::from(value.to_vec()));
83                    }
84                    Err(e) => {
85                        return Err(HeliaError::datastore(format!("Query error: {}", e)));
86                    }
87                }
88            }
89        }
90
91        Ok(Box::pin(stream::iter(results)))
92    }
93}