balius_runtime/kv/
redb.rs

1/// In memory implementation of KV for development usage.
2use std::{path::Path, sync::Arc};
3
4use crate::wit::balius::app::kv as wit;
5use redb::{Database, Durability, ReadableTable, TableDefinition};
6use tracing::warn;
7use wit::{KvError, Payload};
8
9use super::KvProvider;
10use crate::Error;
11
12#[derive(Clone)]
13pub struct RedbKv {
14    db: Arc<Database>,
15}
16
17impl RedbKv {
18    pub const DEF: TableDefinition<'static, String, Vec<u8>> = TableDefinition::new("kv");
19    pub fn try_new(path: impl AsRef<Path>, cache_size: Option<usize>) -> Result<Self, Error> {
20        let db = Database::builder()
21            .set_repair_callback(|x| warn!(progress = x.progress() * 100f64, "db is repairing"))
22            .set_cache_size(1024 * 1024 * cache_size.unwrap_or(10_000))
23            .create(path)
24            .map_err(|err| Error::KvError(err.to_string()))?;
25
26        let mut wx = db
27            .begin_write()
28            .map_err(|err| Error::KvError(err.to_string()))?;
29        wx.set_durability(Durability::Immediate);
30        wx.open_table(Self::DEF)
31            .map_err(|err| Error::KvError(err.to_string()))?;
32        wx.commit().map_err(|err| Error::KvError(err.to_string()))?;
33
34        Ok(Self { db: Arc::new(db) })
35    }
36
37    pub fn key_for_worker(worker_id: &str, key: &str) -> String {
38        format!("{worker_id}-{key}")
39    }
40
41    pub fn into_ephemeral(&mut self) -> Result<Self, Error> {
42        let new_db = redb::Database::builder()
43            .create_with_backend(redb::backends::InMemoryBackend::new())
44            .map_err(|e| Error::KvError(e.to_string()))?;
45
46        let rx = self.db.begin_read()?;
47        let wx = new_db.begin_write()?;
48
49        {
50            if let Ok(source) = rx.open_table(Self::DEF) {
51                let mut target = wx
52                    .open_table(Self::DEF)
53                    .map_err(|e| Error::KvError(e.to_string()))?;
54
55                for entry in source.iter().map_err(|e| Error::KvError(e.to_string()))? {
56                    let (k, v) = entry.map_err(|e| Error::KvError(e.to_string()))?;
57                    target
58                        .insert(k.value(), v.value())
59                        .map_err(|e| Error::KvError(e.to_string()))?;
60                }
61            };
62        }
63
64        wx.commit().map_err(|e| Error::KvError(e.to_string()))?;
65
66        let new = Self {
67            db: Arc::new(new_db),
68        };
69
70        Ok(new)
71    }
72}
73
74#[async_trait::async_trait]
75impl KvProvider for RedbKv {
76    async fn get_value(&mut self, worker_id: &str, key: String) -> Result<Payload, KvError> {
77        let rx = self
78            .db
79            .begin_read()
80            .map_err(|err| KvError::Internal(err.to_string()))?;
81
82        let table = rx
83            .open_table(Self::DEF)
84            .map_err(|err| KvError::Internal(err.to_string()))?;
85        match table
86            .get(Self::key_for_worker(worker_id, &key))
87            .map_err(|err| KvError::Internal(err.to_string()))?
88        {
89            Some(value) => Ok(value.value()),
90            None => Err(KvError::NotFound(key)),
91        }
92    }
93
94    async fn set_value(
95        &mut self,
96        worker_id: &str,
97        key: String,
98        value: Payload,
99    ) -> Result<(), KvError> {
100        let wx = self
101            .db
102            .begin_write()
103            .map_err(|err| KvError::Internal(err.to_string()))?;
104
105        {
106            let mut table = wx
107                .open_table(Self::DEF)
108                .map_err(|err| KvError::Internal(err.to_string()))?;
109
110            table
111                .insert(Self::key_for_worker(worker_id, &key), value)
112                .map_err(|err| KvError::Internal(err.to_string()))?;
113        }
114
115        wx.commit()
116            .map_err(|err| KvError::Internal(err.to_string()))?;
117
118        Ok(())
119    }
120
121    async fn list_values(
122        &mut self,
123        worker_id: &str,
124        prefix: String,
125    ) -> Result<Vec<String>, KvError> {
126        let rx = self
127            .db
128            .begin_read()
129            .map_err(|err| KvError::Internal(err.to_string()))?;
130
131        let table = rx
132            .open_table(Self::DEF)
133            .map_err(|err| KvError::Internal(err.to_string()))?;
134
135        let mut result = vec![];
136        let range = table
137            .range(Self::key_for_worker(worker_id, &prefix)..)
138            .map_err(|err| KvError::Internal(err.to_string()))?;
139
140        for item in range {
141            let (k, _) = item.unwrap();
142            if k.value()
143                .starts_with(&Self::key_for_worker(worker_id, &prefix))
144            {
145                result.push(k.value());
146            } else {
147                break;
148            }
149        }
150        Ok(result)
151    }
152}