Skip to main content

s2_lite/backend/
store.rs

1use bytes::Bytes;
2use slatedb::{
3    DbTransaction,
4    config::{DurabilityLevel, ReadOptions},
5};
6
7use super::Backend;
8use crate::backend::{error::StorageError, kv};
9
10impl Backend {
11    // TODO: switch to `self.db.status()` once slatedb releases with
12    // https://github.com/slatedb/slatedb/pull/1234
13    pub async fn db_status(&self) -> Result<(), slatedb::Error> {
14        let _ = self.db.get(b"ping").await?;
15        Ok(())
16    }
17
18    pub(super) async fn db_get<K: AsRef<[u8]> + Send, V>(
19        &self,
20        key: K,
21        deser: impl FnOnce(Bytes) -> Result<V, kv::DeserializationError>,
22    ) -> Result<Option<V>, StorageError> {
23        static READ_OPTS: ReadOptions = ReadOptions {
24            durability_filter: DurabilityLevel::Remote,
25            dirty: false,
26            cache_blocks: true,
27        };
28        let value = self
29            .db
30            .get_with_options(key, &READ_OPTS)
31            .await?
32            .map(deser)
33            .transpose()?;
34        Ok(value)
35    }
36}
37
38pub(super) async fn db_txn_get<K: AsRef<[u8]> + Send, V>(
39    txn: &DbTransaction,
40    key: K,
41    deser: impl FnOnce(Bytes) -> Result<V, kv::DeserializationError>,
42) -> Result<Option<V>, StorageError> {
43    static READ_OPTS: ReadOptions = ReadOptions {
44        durability_filter: DurabilityLevel::Memory,
45        dirty: false,
46        cache_blocks: true,
47    };
48    let value = txn
49        .get_with_options(key, &READ_OPTS)
50        .await?
51        .map(deser)
52        .transpose()?;
53    Ok(value)
54}